enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info

This commit is contained in:
wangmm0220 2024-10-16 17:17:44 +08:00
parent 0002ede469
commit 4f39974938
12 changed files with 76 additions and 62 deletions

View File

@ -69,7 +69,7 @@ extern int32_t tdbDebugFlag;
extern int32_t sndDebugFlag;
extern int32_t simDebugFlag;
extern int32_t tqClientDebug;
extern int32_t tqClientDebugFlag;
int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc);
void taosCloseLog();

View File

@ -24,12 +24,9 @@
#include "tref.h"
#include "ttimer.h"
#define tqFatalC(...) do { if (cDebugFlag & DEBUG_FATAL || tqClientDebug) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebug) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqWarnC(...) do { if (cDebugFlag & DEBUG_WARN || tqClientDebug) { taosPrintLog("TQ WARN ", DEBUG_WARN, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebug) { taosPrintLog("TQ ", DEBUG_INFO, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebug) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTraceC(...) do { if (cDebugFlag & DEBUG_TRACE || tqClientDebug) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
@ -957,7 +954,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
}
}
tqClientDebug = rsp.debugFlag;
tqClientDebugFlag = rsp.debugFlag;
tDestroySMqHbRsp(&rsp);
END:

View File

@ -542,7 +542,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebug", tqClientDebug, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebugFlag", tqClientDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "udfDebugFlag", udfDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
@ -1959,7 +1959,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"tdbDebugFlag", &tdbDebugFlag},
{"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"smaDebugFlag", &smaDebugFlag},
{"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"metaDebugFlag", &metaDebugFlag},
{"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, {"tqClientDebug", &tqClientDebug},
{"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, {"tqClientDebugFlag", &tqClientDebugFlag},
};
static OptionNameAndVar options[] = {{"audit", &tsEnableAudit},

View File

@ -245,7 +245,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
}
storeOffsetRows(pMnode, &req, pConsumer);
rsp.debugFlag = tqClientDebug;
rsp.debugFlag = tqClientDebugFlag;
code = buildMqHbRsp(pMsg, &rsp);
END:

View File

@ -243,7 +243,7 @@ int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, con
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, int64_t *createTime);
int32_t tqGetStreamExecInfo(SVnode *pVnode, int64_t streamId, int64_t *pDelay, bool *fhFinished);
// sma

View File

@ -160,7 +160,7 @@ int32_t metaDropTables(SMeta* pMeta, SArray* tbUids);
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime);
int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);

View File

@ -371,7 +371,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
return 0;
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime) {
void *pData = NULL;
int nData = 0;
int64_t version;
@ -407,6 +407,9 @@ _query:
}
} else if (me.type == TSDB_CHILD_TABLE) {
uid = me.ctbEntry.suid;
if (createTime != NULL){
*createTime = me.ctbEntry.btime;
}
tDecoderClear(&dc);
goto _query;
} else {
@ -617,7 +620,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL;
pSW = metaGetTableSchema(pMeta, uid, sver, lock);
pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL);
if (!pSW) return NULL;
pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version);

View File

@ -552,7 +552,7 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
bool ret = false;
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1);
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL);
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
ret = true;
}

View File

@ -263,7 +263,7 @@ bool tqGetTablePrimaryKey(STqReader* pReader) { return pReader->hasPrimaryKey; }
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
bool ret = false;
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1);
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
ret = true;
}
@ -669,7 +669,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
(pReader->cachedSchemaVer != sversion)) {
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
"version %d, possibly dropped table",
@ -961,10 +961,8 @@ END:
return code;
}
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
SSDataBlock* block = NULL;
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pSubmitTbData == NULL) {
return terrno;
@ -980,7 +978,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
pReader->lastBlkUid = uid;
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);

View File

@ -293,6 +293,54 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
return code;
}
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
int32_t code = 0;
void* createReq = NULL;
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
if (pRsp->createTableLen == NULL) {
code = terrno;
goto END;
}
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
if (pRsp->createTableReq == NULL) {
code = terrno;
goto END;
}
}
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
goto END;
}
createReq = taosMemoryCalloc(1, len);
if (createReq == NULL){
code = terrno;
goto END;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
tEncoderClear(&encoder);
if (code < 0) {
goto END;
}
if (taosArrayPush(pRsp->createTableLen, &len) == NULL){
code = terrno;
goto END;
}
if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){
code = terrno;
goto END;
}
pRsp->createTableNum++;
return 0;
END:
taosMemoryFree(createReq);
return code;
}
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
int32_t code = 0;
@ -312,7 +360,8 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
}
SSubmitTbData* pSubmitTbDataRet = NULL;
code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet);
int64_t createTime = INT64_MAX;
code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
if (code != 0) {
tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId);
goto END;
@ -330,46 +379,13 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
}
}
if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
if (pRsp->createTableLen == NULL) {
code = terrno;
goto END;
}
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
if (pRsp->createTableReq == NULL) {
code = terrno;
if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) { // judge if table is already created to avoid sending crateTbReq
code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
if (code != 0){
tqError("vgId:%d, failed to build create table info", pTq->pVnode->config.vgId);
goto END;
}
}
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
goto END;
}
void* createReq = taosMemoryCalloc(1, len);
if (createReq == NULL){
code = terrno;
goto END;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
tEncoderClear(&encoder);
if (code < 0) {
taosMemoryFree(createReq);
goto END;
}
if (taosArrayPush(pRsp->createTableLen, &len) == NULL){
taosMemoryFree(createReq);
goto END;
}
if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){
taosMemoryFree(createReq);
goto END;
}
pRsp->createTableNum++;
}
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) {
goto END;

View File

@ -702,7 +702,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
}
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0);
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL);
if (pSW) {
*num = pSW->nCols;
tDeleteSchemaWrapper(pSW);

View File

@ -125,7 +125,7 @@ int32_t idxDebugFlag = 131;
int32_t sndDebugFlag = 131;
int32_t simDebugFlag = 131;
int32_t tqClientDebug = 0;
int32_t tqClientDebugFlag = 0;
int64_t dbgEmptyW = 0;
int64_t dbgWN = 0;