Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_snapshot
This commit is contained in:
commit
795a071eb5
|
@ -55,7 +55,7 @@ enum {
|
|||
enum {
|
||||
STREAM_INPUT__DATA_SUBMIT = 1,
|
||||
STREAM_INPUT__DATA_BLOCK,
|
||||
STREAM_INPUT__TABLE_SCAN,
|
||||
// STREAM_INPUT__TABLE_SCAN,
|
||||
STREAM_INPUT__TQ_SCAN,
|
||||
STREAM_INPUT__DATA_RETRIEVE,
|
||||
STREAM_INPUT__TRIGGER,
|
||||
|
|
|
@ -124,6 +124,7 @@ typedef struct SWal {
|
|||
typedef struct {
|
||||
int8_t scanUncommited;
|
||||
int8_t scanMeta;
|
||||
int8_t enableRef;
|
||||
} SWalFilterCond;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -305,18 +305,23 @@ function install_lib() {
|
|||
${install_main_dir}/driver &&
|
||||
${csudo}chmod 777 ${install_main_dir}/driver/libtaos.so.${verNumber}
|
||||
|
||||
${csudo}cp ${binary_dir}/build/lib/libtaosws.so \
|
||||
${install_main_dir}/driver &&
|
||||
${csudo}chmod 777 ${install_main_dir}/driver/libtaosws.so
|
||||
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1
|
||||
${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || :
|
||||
|
||||
if [ -d "${lib64_link_dir}" ]; then
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1
|
||||
${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so
|
||||
${csudo}ln -sf ${lib64_link_dir}/libtaosws.so ${lib64_link_dir}/libtaosws.so || :
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1
|
||||
${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so
|
||||
fi
|
||||
|
||||
if [ -f ${binary_dir}/build/lib/libtaosws.so ]; then
|
||||
${csudo}cp ${binary_dir}/build/lib/libtaosws.so \
|
||||
${install_main_dir}/driver &&
|
||||
${csudo}chmod 777 ${install_main_dir}/driver/libtaosws.so ||:
|
||||
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || :
|
||||
|
||||
if [ -d "${lib64_link_dir}" ]; then
|
||||
${csudo}ln -sf ${lib64_link_dir}/libtaosws.so ${lib64_link_dir}/libtaosws.so || :
|
||||
fi
|
||||
fi
|
||||
else
|
||||
${csudo}cp -Rf ${binary_dir}/build/lib/libtaos.${verNumber}.dylib \
|
||||
|
@ -357,26 +362,26 @@ function install_header() {
|
|||
|
||||
if [ "$osType" != "Darwin" ]; then
|
||||
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
|
||||
${csudo}rm -f ${inc_link_dir}/taosws.h ||:
|
||||
${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \
|
||||
${csudo}rm -f ${inc_link_dir}/taosws.h || :
|
||||
|
||||
${csudo}cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taosdef.h ${source_dir}/src/inc/taoserror.h \
|
||||
${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/*
|
||||
|
||||
${csudo}cp -f ${binary_dir}/build/include/taosws.h ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/taosws.h
|
||||
if [ -f ${binary_dir}/build/include/taosws.h ]; then
|
||||
${csudo}cp -f ${binary_dir}/build/include/taosws.h ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/taosws.h ||:
|
||||
${csudo}ln -s ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h ||:
|
||||
fi
|
||||
|
||||
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
|
||||
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
|
||||
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
|
||||
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
|
||||
|
||||
${csudo}ln -s ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h || :
|
||||
else
|
||||
${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \
|
||||
${install_main_dir}/include ||
|
||||
${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \
|
||||
${install_main_2_dir}/include &&
|
||||
${csudo}chmod 644 ${install_main_dir}/include/* ||
|
||||
${csudo}chmod 644 ${install_main_dir}/include/* ||:
|
||||
${csudo}chmod 644 ${install_main_2_dir}/include/*
|
||||
fi
|
||||
}
|
||||
|
|
|
@ -901,6 +901,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
|
||||
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
|
||||
if (pTmq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -917,6 +919,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
pTmq->delayedTask = taosOpenQueue();
|
||||
|
||||
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
|
@ -943,12 +947,14 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
|
||||
// init semaphore
|
||||
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
|
||||
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
// init connection
|
||||
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
|
||||
if (pTmq->pTscObj == NULL) {
|
||||
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
||||
tsem_destroy(&pTmq->rspSem);
|
||||
goto FAIL;
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ int32_t tsNumOfMnodeQueryThreads = 2;
|
|||
int32_t tsNumOfMnodeFetchThreads = 1;
|
||||
int32_t tsNumOfMnodeReadThreads = 1;
|
||||
int32_t tsNumOfVnodeQueryThreads = 2;
|
||||
int32_t tsNumOfVnodeFetchThreads = 1;
|
||||
int32_t tsNumOfVnodeFetchThreads = 4;
|
||||
int32_t tsNumOfVnodeWriteThreads = 2;
|
||||
int32_t tsNumOfVnodeSyncThreads = 2;
|
||||
int32_t tsNumOfVnodeMergeThreads = 2;
|
||||
|
@ -190,7 +190,6 @@ int32_t tsMqRebalanceInterval = 2;
|
|||
int32_t tsTtlUnit = 86400;
|
||||
int32_t tsTtlPushInterval = 60;
|
||||
|
||||
|
||||
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
|
||||
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
|
||||
tsDiskCfg[index].level = level;
|
||||
|
@ -468,7 +467,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
|
||||
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400*365, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 10000, 1) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
|
||||
|
@ -632,7 +631,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
||||
int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
||||
int32_t len = strlen(name);
|
||||
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
|
||||
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
|
||||
|
@ -662,7 +661,7 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
|||
tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32;
|
||||
} else if (strcasecmp("countAlwaysReturnValue", name) == 0) {
|
||||
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
|
||||
} else if (strcasecmp("cDebugFlag", name) == 0) {
|
||||
} else if (strcasecmp("cDebugFlag", name) == 0) {
|
||||
cDebugFlag = cfgGetItem(pCfg, "cDebugFlag")->i32;
|
||||
}
|
||||
break;
|
||||
|
@ -687,10 +686,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
|||
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
||||
|
||||
|
||||
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
||||
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
||||
|
||||
|
||||
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
||||
SEp firstEp = {0};
|
||||
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||
|
@ -700,10 +699,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
|||
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
||||
|
||||
|
||||
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
||||
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
||||
|
||||
|
||||
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
||||
SEp firstEp = {0};
|
||||
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||
|
@ -774,7 +773,7 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
|||
} else if (strcasecmp("minSlidingTime", name) == 0) {
|
||||
tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32;
|
||||
} else if (strcasecmp("minIntervalTime", name) == 0) {
|
||||
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
|
||||
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
|
||||
} else if (strcasecmp("minimalLogDirGB", name) == 0) {
|
||||
tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval;
|
||||
}
|
||||
|
@ -920,10 +919,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
|||
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
||||
|
||||
|
||||
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
||||
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
||||
|
||||
|
||||
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
||||
SEp firstEp = {0};
|
||||
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||
|
@ -995,14 +994,13 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
|||
break;
|
||||
}
|
||||
default:
|
||||
terrno = TSDB_CODE_CFG_NOT_FOUND;
|
||||
terrno = TSDB_CODE_CFG_NOT_FOUND;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
|
||||
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
|
||||
if (tsCfg == NULL) osDefaultInit();
|
||||
|
|
|
@ -15,11 +15,11 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndConsumer.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndOffset.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndSubscribe.h"
|
||||
|
@ -435,17 +435,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
goto SUBSCRIBE_OVER;
|
||||
}
|
||||
|
||||
#if 0
|
||||
// ref topic to prevent drop
|
||||
// TODO make topic complete
|
||||
SMqTopicObj topicObj = {0};
|
||||
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
||||
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
|
||||
mInfo("subscribe topic %s by consumer:%" PRId64 ",cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup,
|
||||
topicObj.refConsumerCnt);
|
||||
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
|
||||
#endif
|
||||
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
}
|
||||
|
||||
|
@ -472,8 +461,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
|
||||
int32_t status = atomic_load_32(&pConsumerOld->status);
|
||||
|
||||
mInfo("receive subscribe request from old consumer:%" PRId64 ", current status: %s", consumerId,
|
||||
mndConsumerStatusName(status));
|
||||
mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d",
|
||||
consumerId, mndConsumerStatusName(status), newTopicNum);
|
||||
|
||||
if (status != MQ_CONSUMER_STATUS__READY) {
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||
|
@ -849,12 +838,15 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
|||
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
|
||||
if (pShow->pIter == NULL) break;
|
||||
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
|
||||
mDebug("showing consumer %ld no assigned topic, skip", pConsumer->consumerId);
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
continue;
|
||||
}
|
||||
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
|
||||
mDebug("showing consumer %ld", pConsumer->consumerId);
|
||||
|
||||
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||
bool hasTopic = true;
|
||||
if (topicSz == 0) {
|
||||
|
|
|
@ -512,9 +512,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
|
||||
}
|
||||
pHandle->execHandle.execTb.suid = req.suid;
|
||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
|
||||
|
@ -524,6 +521,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
|
||||
}
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
|
||||
tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
|
||||
}
|
||||
taosArrayDestroy(tbUidList);
|
||||
|
|
|
@ -174,28 +174,9 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
|
|||
#endif
|
||||
|
||||
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) {
|
||||
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
qTaskInfo_t task = pExec->execCol.task[workerId];
|
||||
ASSERT(task);
|
||||
qSetStreamInput(task, pReq, STREAM_INPUT__DATA_SUBMIT, false);
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
if (pDataBlock == NULL) break;
|
||||
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
|
||||
|
||||
ASSERT(pDataBlock->info.rows != 0);
|
||||
|
||||
tqAddBlockDataToRsp(pDataBlock, pRsp);
|
||||
if (pRsp->withTbName) {
|
||||
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
|
||||
tqAddTbNameToRsp(pTq, uid, pRsp);
|
||||
}
|
||||
pRsp->blockNum++;
|
||||
}
|
||||
} else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
pRsp->withSchema = 1;
|
||||
STqReader* pReader = pExec->pExecReader[workerId];
|
||||
tqReaderSetDataMsg(pReader, pReq, 0);
|
||||
|
@ -232,9 +213,11 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
|
|||
pRsp->blockNum++;
|
||||
}
|
||||
}
|
||||
|
||||
if (pRsp->blockNum == 0) {
|
||||
pRsp->skipLogNum++;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -60,9 +60,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
|
||||
taosArrayPush(pInfo->pBlockLists, &p);
|
||||
}
|
||||
} else if (type == STREAM_INPUT__TABLE_SCAN) {
|
||||
// do nothing
|
||||
ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN);
|
||||
/*} else if (type == STREAM_INPUT__TABLE_SCAN) {*/
|
||||
/*ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN);*/
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -71,6 +70,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
|
||||
if (tinfo == NULL) {
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
|
@ -78,6 +78,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
|
|||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL);
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
|
||||
return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
|
||||
|
|
|
@ -299,7 +299,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
|||
}
|
||||
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version);
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
pInfo->blockType = STREAM_INPUT__TABLE_SCAN;
|
||||
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
|
||||
int64_t uid = pOffset->uid;
|
||||
int64_t ts = pOffset->ts;
|
||||
|
||||
|
|
|
@ -1123,6 +1123,7 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
|
|||
uidCol[i] = getGroupId(pOperator, uidCol[i]);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) {
|
||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
|
||||
|
@ -1216,13 +1217,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
/*pTaskInfo->streamInfo.lastStatus = ret.offset;*/
|
||||
// TODO clean data block
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
return pInfo->pRes;
|
||||
} else {
|
||||
// data is filtered out, do clean
|
||||
|
||||
/*tDeleteSSDataBlock(&ret.data);*/
|
||||
}
|
||||
} else if (ret.fetchType == FETCH_TYPE__META) {
|
||||
ASSERT(0);
|
||||
|
@ -1240,6 +1237,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||
return pResult && pResult->info.rows > 0 ? pResult : NULL;
|
||||
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
// TODO scan meta
|
||||
ASSERT(0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
|
@ -1397,6 +1398,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doRawScan(SOperatorInfo* pInfo) {
|
||||
//
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
|
||||
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
|
||||
|
||||
|
@ -1409,6 +1415,19 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
|
|||
return tableIdList;
|
||||
}
|
||||
|
||||
// for subscribing db or stb (not including column),
|
||||
// if this scan is used, meta data can be return
|
||||
// and schemas are decided when scanning
|
||||
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
|
||||
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup) {
|
||||
// create operator
|
||||
// create tb reader
|
||||
// create meta reader
|
||||
// create tq reader
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
|
||||
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
|
||||
uint64_t taskId) {
|
||||
|
@ -1452,16 +1471,16 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
|
||||
if (pHandle) {
|
||||
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info;
|
||||
STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
|
||||
if (pHandle->version > 0) {
|
||||
pSTInfo->cond.endVersion = pHandle->version;
|
||||
pTSInfo->cond.endVersion = pHandle->version;
|
||||
}
|
||||
|
||||
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
|
||||
if (pHandle->initTableReader) {
|
||||
pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
||||
pSTInfo->dataReader = NULL;
|
||||
if (tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, NULL) < 0) {
|
||||
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
||||
pTSInfo->dataReader = NULL;
|
||||
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
@ -1475,14 +1494,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->tqReader = pHandle->tqReader;
|
||||
}
|
||||
|
||||
if (pSTInfo->interval.interval > 0) {
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
||||
if (pTSInfo->interval.interval > 0) {
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pTwSup->waterMark);
|
||||
} else {
|
||||
pInfo->pUpdateInfo = NULL;
|
||||
}
|
||||
|
||||
pInfo->pTableScanOp = pTableScanOp;
|
||||
pInfo->interval = pSTInfo->interval;
|
||||
pInfo->interval = pTSInfo->interval;
|
||||
|
||||
pInfo->readHandle = *pHandle;
|
||||
pInfo->tableUid = pScanPhyNode->uid;
|
||||
|
|
|
@ -30,19 +30,24 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
|||
pRead->pWal = pWal;
|
||||
pRead->pIdxFile = NULL;
|
||||
pRead->pLogFile = NULL;
|
||||
pRead->curVersion = -5;
|
||||
pRead->curVersion = -1;
|
||||
pRead->curFileFirstVer = -1;
|
||||
pRead->curInvalid = 1;
|
||||
pRead->capacity = 0;
|
||||
if (cond)
|
||||
if (cond) {
|
||||
pRead->cond = *cond;
|
||||
else {
|
||||
} else {
|
||||
pRead->cond.scanMeta = 0;
|
||||
pRead->cond.scanUncommited = 0;
|
||||
pRead->cond.enableRef = 0;
|
||||
}
|
||||
|
||||
taosThreadMutexInit(&pRead->mutex, NULL);
|
||||
|
||||
/*if (pRead->cond.enableRef) {*/
|
||||
/*walOpenRef(pWal);*/
|
||||
/*}*/
|
||||
|
||||
pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
|
||||
if (pRead->pHead == NULL) {
|
||||
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||
|
@ -151,24 +156,8 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
||||
int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) {
|
||||
SWal *pWal = pRead->pWal;
|
||||
if (!pRead->curInvalid && ver == pRead->curVersion) {
|
||||
wDebug("wal version %ld match, no need to reset", ver);
|
||||
return 0;
|
||||
}
|
||||
|
||||
pRead->curInvalid = 1;
|
||||
pRead->curVersion = ver;
|
||||
|
||||
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
||||
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
if (ver < pWal->vers.snapshotVer) {
|
||||
}
|
||||
|
||||
SWalFileInfo tmpInfo;
|
||||
tmpInfo.firstVer = ver;
|
||||
|
@ -190,6 +179,31 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
|||
wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver);
|
||||
|
||||
pRead->curVersion = ver;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
||||
SWal *pWal = pRead->pWal;
|
||||
if (!pRead->curInvalid && ver == pRead->curVersion) {
|
||||
wDebug("wal version %ld match, no need to reset", ver);
|
||||
return 0;
|
||||
}
|
||||
|
||||
pRead->curInvalid = 1;
|
||||
pRead->curVersion = ver;
|
||||
|
||||
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
||||
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
if (ver < pWal->vers.snapshotVer) {
|
||||
}
|
||||
|
||||
if (walReadSeekVerImpl(pRead, ver) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -198,6 +212,8 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity
|
|||
|
||||
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||
int64_t contLen;
|
||||
bool seeked = false;
|
||||
|
||||
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
|
||||
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
||||
ASSERT(0);
|
||||
|
@ -205,17 +221,26 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
|||
pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
seeked = true;
|
||||
}
|
||||
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||
if (contLen != sizeof(SWalCkHead)) {
|
||||
if (contLen < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
while (1) {
|
||||
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||
if (contLen == sizeof(SWalCkHead)) {
|
||||
break;
|
||||
} else if (contLen == 0 && !seeked) {
|
||||
walReadSeekVerImpl(pRead, fetchVer);
|
||||
seeked = true;
|
||||
continue;
|
||||
} else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
if (contLen < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
} else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
ASSERT(0);
|
||||
pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
ASSERT(0);
|
||||
pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -379,20 +404,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
|||
}
|
||||
|
||||
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||
int64_t code;
|
||||
int64_t contLen;
|
||||
bool seeked = false;
|
||||
|
||||
if (pRead->pWal->vers.firstVer == -1) {
|
||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pRead->curInvalid || pRead->curVersion != ver) {
|
||||
if (walReadSeekVer(pRead, ver) < 0) {
|
||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
|
||||
wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
|
||||
|
@ -400,21 +419,35 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(taosValidFile(pRead->pLogFile) == true);
|
||||
|
||||
code = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||
if (code != sizeof(SWalCkHead)) {
|
||||
if (code < 0)
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(0);
|
||||
if (pRead->curInvalid || pRead->curVersion != ver) {
|
||||
if (walReadSeekVer(pRead, ver) < 0) {
|
||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
|
||||
return -1;
|
||||
}
|
||||
return -1;
|
||||
seeked = true;
|
||||
}
|
||||
|
||||
code = walValidHeadCksum(pRead->pHead);
|
||||
if (code != 0) {
|
||||
while (1) {
|
||||
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||
if (contLen == sizeof(SWalCkHead)) {
|
||||
break;
|
||||
} else if (contLen == 0 && !seeked) {
|
||||
walReadSeekVerImpl(pRead, ver);
|
||||
seeked = true;
|
||||
continue;
|
||||
} else {
|
||||
if (contLen < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
} else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
contLen = walValidHeadCksum(pRead->pHead);
|
||||
if (contLen != 0) {
|
||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
|
@ -430,9 +463,9 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
|||
pRead->capacity = pRead->pHead->head.bodyLen;
|
||||
}
|
||||
|
||||
if ((code = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
|
||||
if ((contLen = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
|
||||
pRead->pHead->head.bodyLen) {
|
||||
if (code < 0)
|
||||
if (contLen < 0)
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
|
@ -449,8 +482,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
code = walValidBodyCksum(pRead->pHead);
|
||||
if (code != 0) {
|
||||
contLen = walValidBodyCksum(pRead->pHead);
|
||||
if (contLen != 0) {
|
||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||
pRead->curInvalid = 1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
|
|
|
@ -1,25 +1,46 @@
|
|||
IF (TD_WEBSOCKET)
|
||||
MESSAGE("${Green} use libtaos-ws${ColourReset}")
|
||||
IF (TD_LINUX)
|
||||
include(ExternalProject)
|
||||
ExternalProject_Add(taosws-rs
|
||||
PREFIX "taosws-rs"
|
||||
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs
|
||||
BUILD_ALWAYS off
|
||||
DEPENDS taos
|
||||
BUILD_IN_SOURCE 1
|
||||
CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config"
|
||||
PATCH_COMMAND
|
||||
COMMAND git clean -f -d
|
||||
BUILD_COMMAND
|
||||
COMMAND cargo build --release -p taos-ws-sys
|
||||
COMMAND ./taos-ws-sys/ci/package.sh
|
||||
INSTALL_COMMAND
|
||||
COMMAND cmake -E copy target/libtaosws/libtaosws.so ${CMAKE_BINARY_DIR}/build/lib
|
||||
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include
|
||||
COMMAND cmake -E copy target/libtaosws/taosws.h ${CMAKE_BINARY_DIR}/build/include
|
||||
)
|
||||
ENDIF()
|
||||
MESSAGE("${Green} use libtaos-ws${ColourReset}")
|
||||
IF (TD_LINUX)
|
||||
IF (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs/target/release/libtaosws.so" OR "${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs/target/release/libtaosws.so" IS_NEWER_THAN "${CMAKE_SOURCE_DIR}/.git/modules/tools/taosws-rs/FETCH_HEAD")
|
||||
include(ExternalProject)
|
||||
ExternalProject_Add(taosws-rs
|
||||
PREFIX "taosws-rs"
|
||||
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs
|
||||
BUILD_ALWAYS off
|
||||
DEPENDS taos
|
||||
BUILD_IN_SOURCE 1
|
||||
CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config"
|
||||
PATCH_COMMAND
|
||||
COMMAND git clean -f -d
|
||||
BUILD_COMMAND
|
||||
COMMAND cargo build --release -p taos-ws-sys
|
||||
COMMAND ./taos-ws-sys/ci/package.sh
|
||||
INSTALL_COMMAND
|
||||
COMMAND cmake -E copy target/libtaosws/libtaosws.so ${CMAKE_BINARY_DIR}/build/lib
|
||||
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include
|
||||
COMMAND cmake -E copy target/libtaosws/taosws.h ${CMAKE_BINARY_DIR}/build/include
|
||||
)
|
||||
ELSE()
|
||||
include(ExternalProject)
|
||||
ExternalProject_Add(taosws-rs
|
||||
PREFIX "taosws-rs"
|
||||
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs
|
||||
BUILD_ALWAYS on
|
||||
DEPENDS taos
|
||||
BUILD_IN_SOURCE 1
|
||||
CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config"
|
||||
PATCH_COMMAND
|
||||
COMMAND git clean -f -d
|
||||
BUILD_COMMAND
|
||||
COMMAND cargo build --release -p taos-ws-sys
|
||||
COMMAND ./taos-ws-sys/ci/package.sh
|
||||
INSTALL_COMMAND
|
||||
COMMAND cmake -E copy target/libtaosws/libtaosws.so ${CMAKE_BINARY_DIR}/build/lib
|
||||
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include
|
||||
COMMAND cmake -E copy target/libtaosws/taosws.h ${CMAKE_BINARY_DIR}/build/include
|
||||
)
|
||||
ENDIF ()
|
||||
ENDIF()
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_TAOS_TOOLS)
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 9cb71e3c4c0474553aa961cbe19795541c29b5c7
|
||||
Subproject commit d69d9feaf067db6af7c49b651f67ba0b0e11a861
|
|
@ -1 +1 @@
|
|||
Subproject commit 430982a0c2c29a819ffc414d11f49f2d424ca3fe
|
||||
Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6
|
Loading…
Reference in New Issue