diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 614e7c9974..3d15e8b087 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -55,7 +55,7 @@ enum { enum { STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_BLOCK, - STREAM_INPUT__TABLE_SCAN, + // STREAM_INPUT__TABLE_SCAN, STREAM_INPUT__TQ_SCAN, STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__TRIGGER, diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 9b94864a05..0a8e55bb4f 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -124,6 +124,7 @@ typedef struct SWal { typedef struct { int8_t scanUncommited; int8_t scanMeta; + int8_t enableRef; } SWalFilterCond; typedef struct { diff --git a/packaging/tools/make_install.sh b/packaging/tools/make_install.sh index 8ad42811d4..680fa6736b 100755 --- a/packaging/tools/make_install.sh +++ b/packaging/tools/make_install.sh @@ -305,18 +305,23 @@ function install_lib() { ${install_main_dir}/driver && ${csudo}chmod 777 ${install_main_dir}/driver/libtaos.so.${verNumber} - ${csudo}cp ${binary_dir}/build/lib/libtaosws.so \ - ${install_main_dir}/driver && - ${csudo}chmod 777 ${install_main_dir}/driver/libtaosws.so - ${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1 ${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so - ${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || : - if [ -d "${lib64_link_dir}" ]; then - ${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 - ${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so - ${csudo}ln -sf ${lib64_link_dir}/libtaosws.so ${lib64_link_dir}/libtaosws.so || : + ${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 + ${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so + fi + + if [ -f ${binary_dir}/build/lib/libtaosws.so ]; then + ${csudo}cp ${binary_dir}/build/lib/libtaosws.so \ + ${install_main_dir}/driver && + ${csudo}chmod 777 ${install_main_dir}/driver/libtaosws.so ||: + + ${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || : + + if [ -d "${lib64_link_dir}" ]; then + ${csudo}ln -sf ${lib64_link_dir}/libtaosws.so ${lib64_link_dir}/libtaosws.so || : + fi fi else ${csudo}cp -Rf ${binary_dir}/build/lib/libtaos.${verNumber}.dylib \ @@ -357,26 +362,26 @@ function install_header() { if [ "$osType" != "Darwin" ]; then ${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || : + ${csudo}rm -f ${inc_link_dir}/taosws.h ||: ${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \ - ${csudo}rm -f ${inc_link_dir}/taosws.h || : - - ${csudo}cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taosdef.h ${source_dir}/src/inc/taoserror.h \ ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/* - ${csudo}cp -f ${binary_dir}/build/include/taosws.h ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/taosws.h + if [ -f ${binary_dir}/build/include/taosws.h ]; then + ${csudo}cp -f ${binary_dir}/build/include/taosws.h ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/taosws.h ||: + ${csudo}ln -s ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h ||: + fi ${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h ${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h ${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h ${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h - ${csudo}ln -s ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h || : else ${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \ ${install_main_dir}/include || ${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \ ${install_main_2_dir}/include && - ${csudo}chmod 644 ${install_main_dir}/include/* || + ${csudo}chmod 644 ${install_main_dir}/include/* ||: ${csudo}chmod 644 ${install_main_2_dir}/include/* fi } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 03f3ef95c5..110b839216 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -901,6 +901,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t)); if (pTmq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); return NULL; } @@ -917,6 +919,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->delayedTask = taosOpenQueue(); if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); goto FAIL; } @@ -943,12 +947,14 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { // init semaphore if (tsem_init(&pTmq->rspSem, 0, 0) != 0) { + tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); goto FAIL; } // init connection pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ); if (pTmq->pTscObj == NULL) { + tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); tsem_destroy(&pTmq->rspSem); goto FAIL; } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 98cd6e861e..496c7beb47 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -55,7 +55,7 @@ int32_t tsNumOfMnodeQueryThreads = 2; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 2; -int32_t tsNumOfVnodeFetchThreads = 1; +int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeMergeThreads = 2; @@ -190,7 +190,6 @@ int32_t tsMqRebalanceInterval = 2; int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 60; - void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN); tsDiskCfg[index].level = level; @@ -468,7 +467,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; - if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400*365, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1; if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 10000, 1) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; @@ -632,7 +631,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { return 0; } -int32_t taosSetCfg(SConfig *pCfg, char* name) { +int32_t taosSetCfg(SConfig *pCfg, char *name) { int32_t len = strlen(name); char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len)); @@ -662,7 +661,7 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) { tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32; } else if (strcasecmp("countAlwaysReturnValue", name) == 0) { tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32; - } else if (strcasecmp("cDebugFlag", name) == 0) { + } else if (strcasecmp("cDebugFlag", name) == 0) { cDebugFlag = cfgGetItem(pCfg, "cDebugFlag")->i32; } break; @@ -687,10 +686,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) { tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN); tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32; snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort); - + char defaultFirstEp[TSDB_EP_LEN] = {0}; snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort); - + SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp"); SEp firstEp = {0}; taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp); @@ -700,10 +699,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) { tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN); tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32; snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort); - + char defaultFirstEp[TSDB_EP_LEN] = {0}; snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort); - + SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp"); SEp firstEp = {0}; taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp); @@ -774,7 +773,7 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) { } else if (strcasecmp("minSlidingTime", name) == 0) { tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32; } else if (strcasecmp("minIntervalTime", name) == 0) { - tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32; + tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32; } else if (strcasecmp("minimalLogDirGB", name) == 0) { tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval; } @@ -920,10 +919,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) { tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN); tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32; snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort); - + char defaultFirstEp[TSDB_EP_LEN] = {0}; snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort); - + SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp"); SEp firstEp = {0}; taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp); @@ -995,14 +994,13 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) { break; } default: - terrno = TSDB_CODE_CFG_NOT_FOUND; + terrno = TSDB_CODE_CFG_NOT_FOUND; return -1; } - + return 0; } - int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) { if (tsCfg == NULL) osDefaultInit(); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 5e7c19ce6a..27b785f4d2 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -15,11 +15,11 @@ #define _DEFAULT_SOURCE #include "mndConsumer.h" -#include "mndPrivilege.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" #include "mndOffset.h" +#include "mndPrivilege.h" #include "mndShow.h" #include "mndStb.h" #include "mndSubscribe.h" @@ -435,17 +435,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto SUBSCRIBE_OVER; } -#if 0 - // ref topic to prevent drop - // TODO make topic complete - SMqTopicObj topicObj = {0}; - memcpy(&topicObj, pTopic, sizeof(SMqTopicObj)); - topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1; - mInfo("subscribe topic %s by consumer:%" PRId64 ",cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup, - topicObj.refConsumerCnt); - if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER; -#endif - mndReleaseTopic(pMnode, pTopic); } @@ -472,8 +461,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t status = atomic_load_32(&pConsumerOld->status); - mInfo("receive subscribe request from old consumer:%" PRId64 ", current status: %s", consumerId, - mndConsumerStatusName(status)); + mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d", + consumerId, mndConsumerStatusName(status), newTopicNum); if (status != MQ_CONSUMER_STATUS__READY) { terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; @@ -849,12 +838,15 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); if (pShow->pIter == NULL) break; if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { + mDebug("showing consumer %ld no assigned topic, skip", pConsumer->consumerId); sdbRelease(pSdb, pConsumer); continue; } taosRLockLatch(&pConsumer->lock); + mDebug("showing consumer %ld", pConsumer->consumerId); + int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics); bool hasTopic = true; if (topicSz == 0) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ae0f7f56a2..3873073f03 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -512,9 +512,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - for (int32_t i = 0; i < 5; i++) { - pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode); - } pHandle->execHandle.execTb.suid = req.suid; SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); @@ -524,6 +521,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); } for (int32_t i = 0; i < 5; i++) { + pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode); tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList); } taosArrayDestroy(tbUidList); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 3ee274ced1..54e46e7b9a 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -174,28 +174,9 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S #endif int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) { - if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) { - qTaskInfo_t task = pExec->execCol.task[workerId]; - ASSERT(task); - qSetStreamInput(task, pReq, STREAM_INPUT__DATA_SUBMIT, false); - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(0); - } - if (pDataBlock == NULL) break; + ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); - ASSERT(pDataBlock->info.rows != 0); - - tqAddBlockDataToRsp(pDataBlock, pRsp); - if (pRsp->withTbName) { - int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; - tqAddTbNameToRsp(pTq, uid, pRsp); - } - pRsp->blockNum++; - } - } else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { pRsp->withSchema = 1; STqReader* pReader = pExec->pExecReader[workerId]; tqReaderSetDataMsg(pReader, pReq, 0); @@ -232,9 +213,11 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR pRsp->blockNum++; } } + if (pRsp->blockNum == 0) { pRsp->skipLogNum++; return -1; } + return 0; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5a1a46a9b8..9f6b12c13a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -60,9 +60,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); } - } else if (type == STREAM_INPUT__TABLE_SCAN) { - // do nothing - ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN); + /*} else if (type == STREAM_INPUT__TABLE_SCAN) {*/ + /*ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN);*/ } else { ASSERT(0); } @@ -71,6 +70,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } +#if 0 int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR; @@ -78,6 +78,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL); } +#endif int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { return qSetMultiStreamInput(tinfo, input, 1, type, assignUid); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 12fcd103e8..b30800680b 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -299,7 +299,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { } ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version); } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - pInfo->blockType = STREAM_INPUT__TABLE_SCAN; + /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ int64_t uid = pOffset->uid; int64_t ts = pOffset->ts; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7410205d43..407f799496 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1123,6 +1123,7 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32 uidCol[i] = getGroupId(pOperator, uidCol[i]); } } + static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SOperatorInfo* pOperator = pInfo->pStreamScanOp; @@ -1216,13 +1217,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (setBlockIntoRes(pInfo, &ret.data) < 0) { ASSERT(0); } - /*pTaskInfo->streamInfo.lastStatus = ret.offset;*/ + // TODO clean data block if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; - } else { - // data is filtered out, do clean - - /*tDeleteSSDataBlock(&ret.data);*/ } } else if (ret.fetchType == FETCH_TYPE__META) { ASSERT(0); @@ -1240,6 +1237,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); return pResult && pResult->info.rows > 0 ? pResult : NULL; + } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) { + // TODO scan meta + ASSERT(0); + return NULL; } size_t total = taosArrayGetSize(pInfo->pBlockLists); @@ -1397,6 +1398,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } } +static SSDataBlock* doRawScan(SOperatorInfo* pInfo) { + // + return NULL; +} + static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); @@ -1409,6 +1415,19 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { return tableIdList; } +// for subscribing db or stb (not including column), +// if this scan is used, meta data can be return +// and schemas are decided when scanning +SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, + SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup) { + // create operator + // create tb reader + // create meta reader + // create tq reader + + return NULL; +} + SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId) { @@ -1452,16 +1471,16 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle) { SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); - STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info; + STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info; if (pHandle->version > 0) { - pSTInfo->cond.endVersion = pHandle->version; + pTSInfo->cond.endVersion = pHandle->version; } SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); if (pHandle->initTableReader) { - pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER; - pSTInfo->dataReader = NULL; - if (tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, NULL) < 0) { + pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; + pTSInfo->dataReader = NULL; + if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) { ASSERT(0); } } @@ -1475,14 +1494,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->tqReader = pHandle->tqReader; } - if (pSTInfo->interval.interval > 0) { - pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark); + if (pTSInfo->interval.interval > 0) { + pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pTwSup->waterMark); } else { pInfo->pUpdateInfo = NULL; } pInfo->pTableScanOp = pTableScanOp; - pInfo->interval = pSTInfo->interval; + pInfo->interval = pTSInfo->interval; pInfo->readHandle = *pHandle; pInfo->tableUid = pScanPhyNode->uid; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 2d910a85b8..eb0c7f56bd 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -30,19 +30,24 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { pRead->pWal = pWal; pRead->pIdxFile = NULL; pRead->pLogFile = NULL; - pRead->curVersion = -5; + pRead->curVersion = -1; pRead->curFileFirstVer = -1; pRead->curInvalid = 1; pRead->capacity = 0; - if (cond) + if (cond) { pRead->cond = *cond; - else { + } else { pRead->cond.scanMeta = 0; pRead->cond.scanUncommited = 0; + pRead->cond.enableRef = 0; } taosThreadMutexInit(&pRead->mutex, NULL); + /*if (pRead->cond.enableRef) {*/ + /*walOpenRef(pWal);*/ + /*}*/ + pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead)); if (pRead->pHead == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; @@ -151,24 +156,8 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) { return 0; } -int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { +int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) { SWal *pWal = pRead->pWal; - if (!pRead->curInvalid && ver == pRead->curVersion) { - wDebug("wal version %ld match, no need to reset", ver); - return 0; - } - - pRead->curInvalid = 1; - pRead->curVersion = ver; - - if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { - wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, - ver, pWal->vers.firstVer, pWal->vers.lastVer); - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - return -1; - } - if (ver < pWal->vers.snapshotVer) { - } SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; @@ -190,6 +179,31 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver); pRead->curVersion = ver; + return 0; +} + +int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { + SWal *pWal = pRead->pWal; + if (!pRead->curInvalid && ver == pRead->curVersion) { + wDebug("wal version %ld match, no need to reset", ver); + return 0; + } + + pRead->curInvalid = 1; + pRead->curVersion = ver; + + if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { + wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, + ver, pWal->vers.firstVer, pWal->vers.lastVer); + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + return -1; + } + if (ver < pWal->vers.snapshotVer) { + } + + if (walReadSeekVerImpl(pRead, ver) < 0) { + return -1; + } return 0; } @@ -198,6 +212,8 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { int64_t contLen; + bool seeked = false; + if (pRead->curInvalid || pRead->curVersion != fetchVer) { if (walReadSeekVer(pRead, fetchVer) < 0) { ASSERT(0); @@ -205,17 +221,26 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { pRead->curInvalid = 1; return -1; } + seeked = true; } - contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); - if (contLen != sizeof(SWalCkHead)) { - if (contLen < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + while (1) { + contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); + if (contLen == sizeof(SWalCkHead)) { + break; + } else if (contLen == 0 && !seeked) { + walReadSeekVerImpl(pRead, fetchVer); + seeked = true; + continue; } else { - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + if (contLen < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + } else { + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + } + ASSERT(0); + pRead->curInvalid = 1; + return -1; } - ASSERT(0); - pRead->curInvalid = 1; - return -1; } return 0; } @@ -379,20 +404,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { } int32_t walReadVer(SWalReader *pRead, int64_t ver) { - int64_t code; + int64_t contLen; + bool seeked = false; if (pRead->pWal->vers.firstVer == -1) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } - if (pRead->curInvalid || pRead->curVersion != ver) { - if (walReadSeekVer(pRead, ver) < 0) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr()); - return -1; - } - } - if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) { wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer); @@ -400,21 +419,35 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { return -1; } - ASSERT(taosValidFile(pRead->pLogFile) == true); - - code = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); - if (code != sizeof(SWalCkHead)) { - if (code < 0) - terrno = TAOS_SYSTEM_ERROR(errno); - else { - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - ASSERT(0); + if (pRead->curInvalid || pRead->curVersion != ver) { + if (walReadSeekVer(pRead, ver) < 0) { + wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr()); + return -1; } - return -1; + seeked = true; } - code = walValidHeadCksum(pRead->pHead); - if (code != 0) { + while (1) { + contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); + if (contLen == sizeof(SWalCkHead)) { + break; + } else if (contLen == 0 && !seeked) { + walReadSeekVerImpl(pRead, ver); + seeked = true; + continue; + } else { + if (contLen < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + } else { + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + } + ASSERT(0); + return -1; + } + } + + contLen = walValidHeadCksum(pRead->pHead); + if (contLen != 0) { wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; @@ -430,9 +463,9 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { pRead->capacity = pRead->pHead->head.bodyLen; } - if ((code = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) != + if ((contLen = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) != pRead->pHead->head.bodyLen) { - if (code < 0) + if (contLen < 0) terrno = TAOS_SYSTEM_ERROR(errno); else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; @@ -449,8 +482,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { return -1; } - code = walValidBodyCksum(pRead->pHead); - if (code != 0) { + contLen = walValidBodyCksum(pRead->pHead); + if (contLen != 0) { wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 59908cca8a..5d463e1de9 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -1,25 +1,46 @@ IF (TD_WEBSOCKET) - MESSAGE("${Green} use libtaos-ws${ColourReset}") - IF (TD_LINUX) - include(ExternalProject) - ExternalProject_Add(taosws-rs - PREFIX "taosws-rs" - SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs - BUILD_ALWAYS off - DEPENDS taos - BUILD_IN_SOURCE 1 - CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config" - PATCH_COMMAND - COMMAND git clean -f -d - BUILD_COMMAND - COMMAND cargo build --release -p taos-ws-sys - COMMAND ./taos-ws-sys/ci/package.sh - INSTALL_COMMAND - COMMAND cmake -E copy target/libtaosws/libtaosws.so ${CMAKE_BINARY_DIR}/build/lib - COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include - COMMAND cmake -E copy target/libtaosws/taosws.h ${CMAKE_BINARY_DIR}/build/include - ) - ENDIF() + MESSAGE("${Green} use libtaos-ws${ColourReset}") + IF (TD_LINUX) + IF (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs/target/release/libtaosws.so" OR "${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs/target/release/libtaosws.so" IS_NEWER_THAN "${CMAKE_SOURCE_DIR}/.git/modules/tools/taosws-rs/FETCH_HEAD") + include(ExternalProject) + ExternalProject_Add(taosws-rs + PREFIX "taosws-rs" + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs + BUILD_ALWAYS off + DEPENDS taos + BUILD_IN_SOURCE 1 + CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config" + PATCH_COMMAND + COMMAND git clean -f -d + BUILD_COMMAND + COMMAND cargo build --release -p taos-ws-sys + COMMAND ./taos-ws-sys/ci/package.sh + INSTALL_COMMAND + COMMAND cmake -E copy target/libtaosws/libtaosws.so ${CMAKE_BINARY_DIR}/build/lib + COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include + COMMAND cmake -E copy target/libtaosws/taosws.h ${CMAKE_BINARY_DIR}/build/include + ) + ELSE() + include(ExternalProject) + ExternalProject_Add(taosws-rs + PREFIX "taosws-rs" + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosws-rs + BUILD_ALWAYS on + DEPENDS taos + BUILD_IN_SOURCE 1 + CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config" + PATCH_COMMAND + COMMAND git clean -f -d + BUILD_COMMAND + COMMAND cargo build --release -p taos-ws-sys + COMMAND ./taos-ws-sys/ci/package.sh + INSTALL_COMMAND + COMMAND cmake -E copy target/libtaosws/libtaosws.so ${CMAKE_BINARY_DIR}/build/lib + COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include + COMMAND cmake -E copy target/libtaosws/taosws.h ${CMAKE_BINARY_DIR}/build/include + ) + ENDIF () + ENDIF() ENDIF () IF (TD_TAOS_TOOLS) diff --git a/tools/taos-tools b/tools/taos-tools index 9cb71e3c4c..d69d9feaf0 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 9cb71e3c4c0474553aa961cbe19795541c29b5c7 +Subproject commit d69d9feaf067db6af7c49b651f67ba0b0e11a861 diff --git a/tools/taosws-rs b/tools/taosws-rs index 430982a0c2..7a94ffab45 160000 --- a/tools/taosws-rs +++ b/tools/taosws-rs @@ -1 +1 @@ -Subproject commit 430982a0c2c29a819ffc414d11f49f2d424ca3fe +Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6