diff --git a/include/util/tlog.h b/include/util/tlog.h index e80e94de32..1a3b687e40 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -69,7 +69,7 @@ extern int32_t tdbDebugFlag; extern int32_t sndDebugFlag; extern int32_t simDebugFlag; -extern int32_t tqClientDebug; +extern int32_t tqClientDebugFlag; int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc); void taosCloseLog(); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ce701a755c..0233372012 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -24,12 +24,9 @@ #include "tref.h" #include "ttimer.h" -#define tqFatalC(...) do { if (cDebugFlag & DEBUG_FATAL || tqClientDebug) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebug) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqWarnC(...) do { if (cDebugFlag & DEBUG_WARN || tqClientDebug) { taosPrintLog("TQ WARN ", DEBUG_WARN, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebug) { taosPrintLog("TQ ", DEBUG_INFO, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebug) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqTraceC(...) do { if (cDebugFlag & DEBUG_TRACE || tqClientDebug) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0) #define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 @@ -957,7 +954,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { } } - tqClientDebug = rsp.debugFlag; + tqClientDebugFlag = rsp.debugFlag; tDestroySMqHbRsp(&rsp); END: diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index b6fdc2c3c7..ea76adef1a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -542,7 +542,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebug", tqClientDebug, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebugFlag", tqClientDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "udfDebugFlag", udfDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); @@ -1959,7 +1959,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"tdbDebugFlag", &tdbDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"metaDebugFlag", &metaDebugFlag}, - {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, {"tqClientDebug", &tqClientDebug}, + {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, {"tqClientDebugFlag", &tqClientDebugFlag}, }; static OptionNameAndVar options[] = {{"audit", &tsEnableAudit}, diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index e80654b65a..0538a5fe83 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -245,7 +245,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { } storeOffsetRows(pMnode, &req, pConsumer); - rsp.debugFlag = tqClientDebug; + rsp.debugFlag = tqClientDebugFlag; code = buildMqHbRsp(pMsg, &rsp); END: diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 204311aa98..610ba43673 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -243,7 +243,7 @@ int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, con int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); -int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); +int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, int64_t *createTime); int32_t tqGetStreamExecInfo(SVnode *pVnode, int64_t streamId, int64_t *pDelay, bool *fhFinished); // sma diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1bd4317234..4497651151 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -160,7 +160,7 @@ int32_t metaDropTables(SMeta* pMeta, SArray* tbUids); int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); -SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); +SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime); int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 484c5c0a16..e2ba8d9ccb 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -371,7 +371,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) { return 0; } -SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) { +SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime) { void *pData = NULL; int nData = 0; int64_t version; @@ -407,6 +407,9 @@ _query: } } else if (me.type == TSDB_CHILD_TABLE) { uid = me.ctbEntry.suid; + if (createTime != NULL){ + *createTime = me.ctbEntry.btime; + } tDecoderClear(&dc); goto _query; } else { @@ -617,7 +620,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) { STSchema *pTSchema = NULL; SSchemaWrapper *pSW = NULL; - pSW = metaGetTableSchema(pMeta, uid, sver, lock); + pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL); if (!pSW) return NULL; pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version); diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 0936d8f092..b2826ec45a 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -552,7 +552,7 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) { void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) { bool ret = false; - SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1); + SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL); if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) { ret = true; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index f594c2f229..fedcc0e82d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -263,7 +263,7 @@ bool tqGetTablePrimaryKey(STqReader* pReader) { return pReader->hasPrimaryKey; } void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) { bool ret = false; - SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1); + SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL); if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) { ret = true; } @@ -669,7 +669,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* (pReader->cachedSchemaVer != sversion)) { tDeleteSchemaWrapper(pReader->pSchemaWrapper); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL); if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 "version %d, possibly dropped table", @@ -961,10 +961,8 @@ END: return code; } -int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { +int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) { tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk); - SSDataBlock* block = NULL; - SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pSubmitTbData == NULL) { return terrno; @@ -980,7 +978,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas pReader->lastBlkUid = uid; tDeleteSchemaWrapper(pReader->pSchemaWrapper); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime); if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 68702754aa..3e4895378b 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -293,6 +293,54 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat return code; } +static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){ + int32_t code = 0; + void* createReq = NULL; + if (pRsp->createTableNum == 0) { + pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); + if (pRsp->createTableLen == NULL) { + code = terrno; + goto END; + } + pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); + if (pRsp->createTableReq == NULL) { + code = terrno; + goto END; + } + } + + uint32_t len = 0; + tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code); + if (TSDB_CODE_SUCCESS != code) { + goto END; + } + createReq = taosMemoryCalloc(1, len); + if (createReq == NULL){ + code = terrno; + goto END; + } + SEncoder encoder = {0}; + tEncoderInit(&encoder, createReq, len); + code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq); + tEncoderClear(&encoder); + if (code < 0) { + goto END; + } + if (taosArrayPush(pRsp->createTableLen, &len) == NULL){ + code = terrno; + goto END; + } + if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){ + code = terrno; + goto END; + } + pRsp->createTableNum++; + + return 0; +END: + taosMemoryFree(createReq); + return code; +} static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){ int32_t code = 0; @@ -312,7 +360,8 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int } SSubmitTbData* pSubmitTbDataRet = NULL; - code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet); + int64_t createTime = INT64_MAX; + code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime); if (code != 0) { tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId); goto END; @@ -330,46 +379,13 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int } } if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) { - if (pRsp->createTableNum == 0) { - pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); - if (pRsp->createTableLen == NULL) { - code = terrno; - goto END; - } - pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); - if (pRsp->createTableReq == NULL) { - code = terrno; + if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) { // judge if table is already created to avoid sending crateTbReq + code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq); + if (code != 0){ + tqError("vgId:%d, failed to build create table info", pTq->pVnode->config.vgId); goto END; } } - - uint32_t len = 0; - tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code); - if (TSDB_CODE_SUCCESS != code) { - goto END; - } - void* createReq = taosMemoryCalloc(1, len); - if (createReq == NULL){ - code = terrno; - goto END; - } - SEncoder encoder = {0}; - tEncoderInit(&encoder, createReq, len); - code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq); - tEncoderClear(&encoder); - if (code < 0) { - taosMemoryFree(createReq); - goto END; - } - if (taosArrayPush(pRsp->createTableLen, &len) == NULL){ - taosMemoryFree(createReq); - goto END; - } - if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){ - taosMemoryFree(createReq); - goto END; - } - pRsp->createTableNum++; } if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) { goto END; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 7c6a2e7313..0929953e1c 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -702,7 +702,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { } int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) { - SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0); + SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL); if (pSW) { *num = pSW->nCols; tDeleteSchemaWrapper(pSW); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 45c8a2f6c2..2b80eaf85d 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -125,7 +125,7 @@ int32_t idxDebugFlag = 131; int32_t sndDebugFlag = 131; int32_t simDebugFlag = 131; -int32_t tqClientDebug = 0; +int32_t tqClientDebugFlag = 0; int64_t dbgEmptyW = 0; int64_t dbgWN = 0;