Merge pull request #27616 from taosdata/fix/TD-30270

feat:[TD-30270] opti close logic in tmq
This commit is contained in:
Pan Wei 2024-10-09 10:04:14 +08:00 committed by GitHub
commit 388148d2e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1247 additions and 1691 deletions

View File

@ -389,7 +389,7 @@ DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES *res);
DLL_EXPORT const char *tmq_err2str(int32_t code);
/* ------------------------------ TAOSX -----------------------------------*/
/* ------------------------------ TAOSX INTERFACE -----------------------------------*/
typedef struct tmq_raw_data {
void *raw;
uint32_t raw_len;

View File

@ -4049,37 +4049,39 @@ void tDeleteMqMetaRsp(SMqMetaRsp* pRsp);
#define MQ_DATA_RSP_VERSION 100
typedef struct {
SMqRspHead head;
STqOffsetVal reqOffset;
STqOffsetVal rspOffset;
int32_t blockNum;
int8_t withTbName;
int8_t withSchema;
SArray* blockDataLen;
SArray* blockData;
SArray* blockTbName;
SArray* blockSchema;
} SMqDataRspCommon;
struct {
SMqRspHead head;
STqOffsetVal rspOffset;
STqOffsetVal reqOffset;
int32_t blockNum;
int8_t withTbName;
int8_t withSchema;
SArray* blockDataLen;
SArray* blockData;
SArray* blockTbName;
SArray* blockSchema;
};
union{
struct{
int64_t sleepTime;
};
struct{
int32_t createTableNum;
SArray* createTableLen;
SArray* createTableReq;
};
};
typedef struct {
SMqDataRspCommon common;
int64_t sleepTime;
} SMqDataRsp;
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const void* pRsp);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, void* pRsp);
void tDeleteMqDataRsp(void* pRsp);
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pObj);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
void tDeleteMqDataRsp(SMqDataRsp* pRsp);
typedef struct {
SMqDataRspCommon common;
int32_t createTableNum;
SArray* createTableLen;
SArray* createTableReq;
} STaosxRsp;
int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const void* pRsp);
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, void* pRsp);
void tDeleteSTaosxRsp(void* pRsp);
int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
void tDeleteSTaosxRsp(SMqDataRsp* pRsp);
typedef struct SMqBatchMetaRsp {
SMqRspHead head; // not serialize
@ -4164,6 +4166,7 @@ typedef struct {
typedef struct {
SArray* topicPrivileges; // SArray<STopicPrivilege>
int32_t debugFlag;
} SMqHbRsp;
typedef struct {

View File

@ -69,6 +69,8 @@ extern int32_t tdbDebugFlag;
extern int32_t sndDebugFlag;
extern int32_t simDebugFlag;
extern int32_t tqClientDebug;
int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc);
void taosCloseLog();
void taosResetLog();

View File

@ -226,31 +226,17 @@ typedef struct {
SSchemaWrapper schema;
int32_t resIter;
SReqResultInfo resInfo;
} SMqRspObjCommon;
typedef struct {
SMqRspObjCommon common;
SMqDataRsp rsp;
union{
struct{
SMqRspHead head;
STqOffsetVal rspOffset;
};
SMqDataRsp dataRsp;
SMqMetaRsp metaRsp;
SMqBatchMetaRsp batchMetaRsp;
};
} SMqRspObj;
typedef struct {
int8_t resType;
char topic[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int32_t vgId;
SMqMetaRsp metaRsp;
} SMqMetaRspObj;
typedef struct {
SMqRspObjCommon common;
STaosxRsp rsp;
} SMqTaosxRspObj;
typedef struct {
SMqRspObjCommon common;
SMqBatchMetaRsp rsp;
} SMqBatchMetaRspObj;
typedef struct SReqRelInfo {
uint64_t userRefId;
uint64_t prevRefId;
@ -332,7 +318,7 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
SMqRspObj* msg = (SMqRspObj*)res;
return (SReqResultInfo*)&msg->common.resInfo;
return (SReqResultInfo*)&msg->resInfo;
}
int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo);

View File

@ -373,25 +373,22 @@ void taos_free_result(TAOS_RES *res) {
SRequestObj *pRequest = (SRequestObj *)res;
tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId);
destroyRequest(pRequest);
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
tDeleteSTaosxRsp(&pRsp->rsp);
doFreeReqResultInfo(&pRsp->common.resInfo);
taosMemoryFree(pRsp);
} else if (TD_RES_TMQ(res)) {
SMqRspObj *pRsp = (SMqRspObj *)res;
tDeleteMqDataRsp(&pRsp->rsp);
doFreeReqResultInfo(&pRsp->common.resInfo);
taosMemoryFree(pRsp);
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res;
tDeleteMqMetaRsp(&pRspObj->metaRsp);
taosMemoryFree(pRspObj);
} else if (TD_RES_TMQ_BATCH_META(res)) {
SMqBatchMetaRspObj *pBtRspObj = (SMqBatchMetaRspObj *)res;
tDeleteMqBatchMetaRsp(&pBtRspObj->rsp);
taosMemoryFree(pBtRspObj);
return;
}
SMqRspObj *pRsp = (SMqRspObj *)res;
if (TD_RES_TMQ(res)) {
tDeleteMqDataRsp(&pRsp->dataRsp);
doFreeReqResultInfo(&pRsp->resInfo);
} else if (TD_RES_TMQ_METADATA(res)) {
tDeleteSTaosxRsp(&pRsp->dataRsp);
doFreeReqResultInfo(&pRsp->resInfo);
} else if (TD_RES_TMQ_META(res)) {
tDeleteMqMetaRsp(&pRsp->metaRsp);
} else if (TD_RES_TMQ_BATCH_META(res)) {
tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp);
}
taosMemoryFree(pRsp);
}
void taos_kill_query(TAOS *taos) {
@ -454,7 +451,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo = NULL;
if (msg->common.resIter == -1) {
if (msg->resIter == -1) {
if (tmqGetNextResInfo(res, true, &pResultInfo) != 0) {
return NULL;
}

View File

@ -548,7 +548,7 @@ end:
tDecoderClear(&decoder);
}
static void processAutoCreateTable(STaosxRsp* rsp, char** string) {
static void processAutoCreateTable(SMqDataRsp* rsp, char** string) {
SDecoder* decoder = NULL;
SVCreateTbReq* pCreateReq = NULL;
int32_t code = 0;
@ -1721,8 +1721,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
pRequest->syncQuery = true;
rspObj.common.resIter = -1;
rspObj.common.resType = RES_TYPE__TMQ;
rspObj.resIter = -1;
rspObj.resType = RES_TYPE__TMQ;
int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION) {
@ -1730,7 +1730,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
dataLen -= sizeof(int8_t) + sizeof(int32_t);
}
tDecoderInit(&decoder, data, dataLen);
code = tDecodeMqDataRsp(&decoder, &rspObj.rsp);
code = tDecodeMqDataRsp(&decoder, &rspObj.dataRsp);
if (code != 0) {
SET_ERROR_MSG("decode mq data rsp failed");
code = TSDB_CODE_INVALID_MSG;
@ -1754,14 +1754,14 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pVgHash);
while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) {
void* pRetrieve = taosArrayGetP(rspObj.rsp.common.blockData, rspObj.common.resIter);
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
RAW_NULL_CHECK(pRetrieve);
if (!rspObj.rsp.common.withSchema) {
if (!rspObj.dataRsp.withSchema) {
goto end;
}
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.common.blockTbName, rspObj.common.resIter);
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
RAW_NULL_CHECK(tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
@ -1778,7 +1778,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
}
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
RAW_NULL_CHECK(pSW);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
RAW_NULL_CHECK(fields);
@ -1805,7 +1805,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
end:
uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
tDeleteMqDataRsp(&rspObj.rsp);
tDeleteMqDataRsp(&rspObj.dataRsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
@ -1814,7 +1814,7 @@ end:
return code;
}
static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) {
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
// find schema data info
int32_t code = 0;
SVCreateTbReq pCreateReq = {0};
@ -1859,7 +1859,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pVgHash = NULL;
SQuery* pQuery = NULL;
SMqTaosxRspObj rspObj = {0};
SMqRspObj rspObj = {0};
SDecoder decoder = {0};
STableMeta* pTableMeta = NULL;
SHashObj* pCreateTbHash = NULL;
@ -1869,8 +1869,8 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
pRequest->syncQuery = true;
rspObj.common.resIter = -1;
rspObj.common.resType = RES_TYPE__TMQ_METADATA;
rspObj.resIter = -1;
rspObj.resType = RES_TYPE__TMQ_METADATA;
int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION) {
@ -1879,7 +1879,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
}
tDecoderInit(&decoder, data, dataLen);
code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp);
code = tDecodeSTaosxRsp(&decoder, &rspObj.dataRsp);
if (code != 0) {
SET_ERROR_MSG("decode mq taosx data rsp failed");
code = TSDB_CODE_INVALID_MSG;
@ -1905,17 +1905,17 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
RAW_NULL_CHECK(pVgHash);
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pCreateTbHash);
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.rsp, pCreateTbHash));
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.common.blockNum);
while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) {
void* pRetrieve = taosArrayGetP(rspObj.rsp.common.blockData, rspObj.common.resIter);
uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
RAW_NULL_CHECK(pRetrieve);
if (!rspObj.rsp.common.withSchema) {
if (!rspObj.dataRsp.withSchema) {
goto end;
}
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.common.blockTbName, rspObj.common.resIter);
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
if (!tbName) {
SET_ERROR_MSG("block tbname is null");
code = TSDB_CODE_TMQ_INVALID_MSG;
@ -1946,7 +1946,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
}
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
RAW_NULL_CHECK(pSW);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if (fields == NULL) {
@ -1991,7 +1991,7 @@ end:
pIter = taosHashIterate(pCreateTbHash, pIter);
}
taosHashCleanup(pCreateTbHash);
tDeleteSTaosxRsp(&rspObj.rsp);
tDeleteSTaosxRsp(&rspObj.dataRsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
@ -2072,31 +2072,28 @@ char* tmq_get_json_meta(TAOS_RES* res) {
return NULL;
}
char* string = NULL;
SMqRspObj* rspObj = (SMqRspObj*)res;
if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pMetaDataRspObj = (SMqTaosxRspObj*)res;
char* string = NULL;
processAutoCreateTable(&pMetaDataRspObj->rsp, &string);
return string;
processAutoCreateTable(&rspObj->dataRsp, &string);
} else if (TD_RES_TMQ_BATCH_META(res)) {
SMqBatchMetaRspObj* pBatchMetaRspObj = (SMqBatchMetaRspObj*)res;
char* string = NULL;
processBatchMetaToJson(&pBatchMetaRspObj->rsp, &string);
return string;
processBatchMetaToJson(&rspObj->batchMetaRsp, &string);
} else if (TD_RES_TMQ_META(res)) {
cJSON* pJson = NULL;
processSimpleMeta(&rspObj->metaRsp, &pJson);
string = cJSON_PrintUnformatted(pJson);
cJSON_Delete(pJson);
} else{
uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
}
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
cJSON* pJson = NULL;
processSimpleMeta(&pMetaRspObj->metaRsp, &pJson);
char* string = cJSON_PrintUnformatted(pJson);
cJSON_Delete(pJson);
uDebug("tmq_get_json_meta string:%s", string);
return string;
}
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
static int32_t getOffSetLen(const void* rsp) {
const SMqDataRspCommon* pRsp = rsp;
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
SEncoder coder = {0};
tEncoderInit(&coder, NULL, 0);
if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
@ -2106,9 +2103,8 @@ static int32_t getOffSetLen(const void* rsp) {
return pos;
}
typedef int32_t __encode_func__(SEncoder* pEncoder, const void* pRsp);
static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_raw_data* raw) {
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder = {0};
@ -2157,15 +2153,14 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
if (!raw || !res) {
return TSDB_CODE_INVALID_PARA;
}
SMqRspObj* rspObj = ((SMqRspObj*)res);
if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
raw->raw = pMetaRspObj->metaRsp.metaRsp;
raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
raw->raw = rspObj->metaRsp.metaRsp;
raw->raw_len = rspObj->metaRsp.metaRspLen;
raw->raw_type = rspObj->metaRsp.resMsgType;
uDebug("tmq get raw type meta:%p", raw);
} else if (TD_RES_TMQ(res)) {
SMqRspObj* rspObj = ((SMqRspObj*)res);
int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->rsp, raw);
int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw);
if (code != 0) {
uError("tmq get raw type error:%d", terrno);
return code;
@ -2173,9 +2168,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw_type = RES_TYPE__TMQ;
uDebug("tmq get raw type data:%p", raw);
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res);
int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->rsp, raw);
int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw);
if (code != 0) {
uError("tmq get raw type error:%d", terrno);
return code;
@ -2183,10 +2176,9 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw_type = RES_TYPE__TMQ_METADATA;
uDebug("tmq get raw type metadata:%p", raw);
} else if (TD_RES_TMQ_BATCH_META(res)) {
SMqBatchMetaRspObj* pBtMetaRspObj = (SMqBatchMetaRspObj*)res;
raw->raw = pBtMetaRspObj->rsp.pMetaBuff;
raw->raw_len = pBtMetaRspObj->rsp.metaBuffLen;
raw->raw_type = RES_TYPE__TMQ_BATCH_META;
raw->raw = rspObj->batchMetaRsp.pMetaBuff;
raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
raw->raw_type = rspObj->resType;
uDebug("tmq get raw batch meta:%p", raw);
} else {
uError("tmq get raw error type:%d", *(int8_t*)res);

File diff suppressed because it is too large Load Diff

View File

@ -542,6 +542,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebug", tqClientDebug, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "udfDebugFlag", udfDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
@ -1957,7 +1958,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"tdbDebugFlag", &tdbDebugFlag},
{"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"smaDebugFlag", &smaDebugFlag},
{"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"metaDebugFlag", &metaDebugFlag},
{"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag},
{"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, {"tqClientDebug", &tqClientDebug},
};
static OptionNameAndVar options[] = {{"audit", &tsEnableAudit},

View File

@ -8163,6 +8163,7 @@ int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
TAOS_CHECK_EXIT(tEncodeI8(&encoder, privilege->noPrivilege));
}
if (tEncodeI32(&encoder, pRsp->debugFlag) < 0) return -1;
tEndEncode(&encoder);
_exit:
@ -8196,6 +8197,10 @@ int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &data->noPrivilege));
}
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pRsp->debugFlag) < 0) return -1;
}
tEndDecode(&decoder);
_exit:
@ -10681,7 +10686,7 @@ int32_t tDecodeMqMetaRsp(SDecoder *pDecoder, SMqMetaRsp *pRsp) {
void tDeleteMqMetaRsp(SMqMetaRsp *pRsp) { taosMemoryFree(pRsp->metaRsp); }
int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRspCommon *pRsp) {
int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
int32_t code = 0;
int32_t lino;
@ -10711,19 +10716,20 @@ _exit:
return code;
}
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const void *pRsp) {
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
TAOS_CHECK_RETURN(tEncodeMqDataRspCommon(pEncoder, pRsp));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, ((SMqDataRsp *)pRsp)->sleepTime));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->sleepTime));
return 0;
}
int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRspCommon *pRsp) {
int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset));
TAOS_CHECK_EXIT(tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->blockNum));
if (pRsp->blockNum != 0) {
if ((pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void *))) == NULL) {
TAOS_CHECK_EXIT(terrno);
@ -10787,17 +10793,16 @@ _exit:
return code;
}
int32_t tDecodeMqDataRsp(SDecoder *pDecoder, void *pRsp) {
int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
TAOS_CHECK_RETURN(tDecodeMqDataRspCommon(pDecoder, pRsp));
if (!tDecodeIsEnd(pDecoder)) {
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &((SMqDataRsp *)pRsp)->sleepTime));
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime));
}
return 0;
}
static void tDeleteMqDataRspCommon(void *rsp) {
SMqDataRspCommon *pRsp = rsp;
static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
@ -10810,15 +10815,13 @@ static void tDeleteMqDataRspCommon(void *rsp) {
tOffsetDestroy(&pRsp->rspOffset);
}
void tDeleteMqDataRsp(void *rsp) { tDeleteMqDataRspCommon(rsp); }
void tDeleteMqDataRsp(SMqDataRsp *rsp) { tDeleteMqDataRspCommon(rsp); }
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const void *rsp) {
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tEncodeMqDataRspCommon(pEncoder, rsp));
const STaosxRsp *pRsp = (const STaosxRsp *)rsp;
TAOS_CHECK_EXIT(tEncodeMqDataRspCommon(pEncoder, pRsp));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->createTableNum));
if (pRsp->createTableNum) {
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
@ -10831,13 +10834,11 @@ _exit:
return code;
}
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, void *rsp) {
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tDecodeMqDataRspCommon(pDecoder, rsp));
STaosxRsp *pRsp = (STaosxRsp *)rsp;
TAOS_CHECK_EXIT(tDecodeMqDataRspCommon(pDecoder, pRsp));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->createTableNum));
if (pRsp->createTableNum) {
if ((pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t))) == NULL) {
@ -10864,10 +10865,9 @@ _exit:
return code;
}
void tDeleteSTaosxRsp(void *rsp) {
tDeleteMqDataRspCommon(rsp);
void tDeleteSTaosxRsp(SMqDataRsp *pRsp) {
tDeleteMqDataRspCommon(pRsp);
STaosxRsp *pRsp = (STaosxRsp *)rsp;
taosArrayDestroy(pRsp->createTableLen);
pRsp->createTableLen = NULL;
taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree);

View File

@ -244,6 +244,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
}
storeOffsetRows(pMnode, &req, pConsumer);
rsp.debugFlag = tqClientDebug;
code = buildMqHbRsp(pMsg, &rsp);
END:
@ -587,8 +588,8 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SCMSubscribeReq subscribe = {0};
MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
bool ubSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
if(ubSubscribe){
bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
if(unSubscribe){
SMqConsumerObj *pConsumerTmp = NULL;
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){
@ -599,7 +600,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
}
MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY,
(ubSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
(unSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE),
pMsg, "subscribe");
MND_TMQ_NULL_CHECK(pTrans);

View File

@ -112,14 +112,14 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
void tqDestroyTqHandle(void* data);
// tqRead
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset);
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset);
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, void* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp,
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId);
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
@ -148,9 +148,9 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name);
// tq util
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const void* pRsp, int32_t epoch, int64_t consumerId,
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
void tqUpdateNodeStage(STQ* pTq, bool isLeader);
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
SSubmitTbData* pTableData, int64_t earlyTs, const char* id);

View File

@ -166,14 +166,14 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
}
SMqDataRsp dataRsp = {0};
code = tqInitDataRsp(&dataRsp.common, req.reqOffset);
code = tqInitDataRsp(&dataRsp, req.reqOffset);
if (code != 0) {
tqError("tqInitDataRsp failed, code:%d", code);
return;
}
dataRsp.common.blockNum = 0;
dataRsp.blockNum = 0;
char buf[TSDB_OFFSET_LEN] = {0};
(void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset);
(void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s,QID:0x%" PRIx64, req.consumerId, vgId, buf,
req.reqId);
@ -184,18 +184,18 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
tDeleteMqDataRsp(&dataRsp);
}
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, int32_t type,
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type,
int32_t vgId) {
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
char buf1[TSDB_OFFSET_LEN] = {0};
char buf2[TSDB_OFFSET_LEN] = {0};
(void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset);
(void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset);
(void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset));
(void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset));
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s,QID:0x%" PRIx64,
vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId);
vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
}
@ -518,7 +518,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
taosRUnLockLatch(&pTq->lock);
SMqDataRsp dataRsp = {0};
code = tqInitDataRsp(&dataRsp.common, req.reqOffset);
code = tqInitDataRsp(&dataRsp, req.reqOffset);
if (code != 0) {
return code;
}
@ -529,10 +529,10 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
goto END;
}
dataRsp.common.rspOffset.type = TMQ_OFFSET__LOG;
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.common.rspOffset.version = reqOffset.version;
dataRsp.rspOffset.version = reqOffset.version;
} else if (reqOffset.type < 0) {
STqOffset* pOffset = NULL;
code = tqMetaGetOffset(pTq, req.subKey, &pOffset);
@ -543,17 +543,17 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
goto END;
}
dataRsp.common.rspOffset.version = pOffset->val.version;
dataRsp.rspOffset.version = pOffset->val.version;
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
req.subKey, dataRsp.common.rspOffset.version);
req.subKey, dataRsp.rspOffset.version);
} else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
dataRsp.common.rspOffset.version = sver; // not consume yet, set the earliest position
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
dataRsp.common.rspOffset.version = ever;
dataRsp.rspOffset.version = ever;
}
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
dataRsp.common.rspOffset.version);
dataRsp.rspOffset.version);
}
} else {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,

View File

@ -15,7 +15,7 @@
#include "tq.h"
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, void* pRsp, int32_t numOfCols, int8_t precision) {
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) {
@ -34,11 +34,11 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, void* pRsp, int32_t numOf
return terrno;
}
actualLen += sizeof(SRetrieveTableRspForTmq);
if (taosArrayPush(((SMqDataRspCommon*)pRsp)->blockDataLen, &actualLen) == NULL){
if (taosArrayPush(pRsp->blockDataLen, &actualLen) == NULL){
taosMemoryFree(buf);
return terrno;
}
if (taosArrayPush(((SMqDataRspCommon*)pRsp)->blockData, &buf) == NULL) {
if (taosArrayPush(pRsp->blockData, &buf) == NULL) {
taosMemoryFree(buf);
return terrno;
}
@ -46,18 +46,18 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, void* pRsp, int32_t numOf
return TSDB_CODE_SUCCESS;
}
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, void* pRsp) {
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper);
if (pSW == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
if (taosArrayPush(((SMqDataRspCommon*)pRsp)->blockSchema, &pSW) == NULL) {
if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL) {
return terrno;
}
return 0;
}
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, void* pRsp, int32_t n) {
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
SMetaReader mr = {0};
metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
@ -73,7 +73,8 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, void* pRsp, int32_t
metaReaderClear(&mr);
return terrno;
}
if(taosArrayPush(((SMqDataRspCommon*)pRsp)->blockTbName, &tbName) == NULL){
if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
tqError("failed to push tbName to blockTbName:%s", tbName);
continue;
}
}
@ -143,7 +144,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
TSDB_CHECK_CODE(code, line, END);
pRsp->common.blockNum++;
pRsp->blockNum++;
if (pDataBlock == NULL) {
blockDataDestroy(pHandle->block);
pHandle->block = NULL;
@ -167,7 +168,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
TSDB_CHECK_CODE(code, line, END);
pRsp->common.blockNum++;
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
break;
@ -176,8 +177,8 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
}
tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
pHandle->consumerId, vgId, pRsp->common.blockNum, totalRows);
code = qStreamExtractOffset(task, &pRsp->common.rspOffset);
pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
code = qStreamExtractOffset(task, &pRsp->rspOffset);
END:
if (code != 0) {
tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line,
@ -186,7 +187,7 @@ END:
return code;
}
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;
int code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
@ -208,7 +209,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatc
tqDebug("tmqsnap task execute end, get %p", pDataBlock);
if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
if (pRsp->common.withTbName) {
if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) {
@ -221,13 +222,13 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatc
tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
return terrno;
}
if (taosArrayPush(pRsp->common.blockTbName, &tbName) == NULL){
if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
continue;
}
}
}
if (pRsp->common.withSchema) {
if (pRsp->withSchema) {
if (pOffset->type == TMQ_OFFSET__LOG) {
if (tqAddBlockSchemaToRsp(pExec, pRsp) != 0){
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
@ -235,19 +236,19 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatc
}
} else {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
if(taosArrayPush(pRsp->common.blockSchema, &pSW) == NULL){
if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
continue;
}
}
}
if (tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision) != 0) {
tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
continue;
}
pRsp->common.blockNum++;
pRsp->blockNum++;
if (pOffset->type == TMQ_OFFSET__LOG) {
continue;
} else {
@ -281,13 +282,13 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatc
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1);
code = qStreamExtractOffset(task, &pRsp->common.rspOffset);
code = qStreamExtractOffset(task, &pRsp->rspOffset);
break;
}
if (pRsp->common.blockNum > 0) {
if (pRsp->blockNum > 0) {
tqDebug("tmqsnap task exec exited, get data");
code = qStreamExtractOffset(task, &pRsp->common.rspOffset);
code = qStreamExtractOffset(task, &pRsp->rspOffset);
break;
}
}
@ -296,7 +297,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqBatc
}
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
int32_t code = 0;
STqExecHandle* pExec = &pHandle->execHandle;
STqReader* pReader = pExec->pTqReader;
@ -323,7 +324,7 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, STaosxRsp* pRsp, int3
if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
goto END;
}
if (pRsp->common.withTbName) {
if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks));
if (code != 0) {
@ -381,7 +382,7 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, STaosxRsp* pRsp, int3
if (pBlock == NULL) {
continue;
}
if (tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision) != 0){
tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
continue;
@ -389,11 +390,11 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, STaosxRsp* pRsp, int3
*totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
if (taosArrayPush(pRsp->common.blockSchema, &pSW) == NULL){
if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
continue;
}
pRsp->common.blockNum++;
pRsp->blockNum++;
}
taosArrayDestroy(pBlocks);
@ -405,7 +406,7 @@ END:
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
}
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows,
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows,
int8_t sourceExcluded) {
STqExecHandle* pExec = &pHandle->execHandle;
int32_t code = 0;

View File

@ -20,7 +20,7 @@ static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqBatchMetaRsp* pRsp, int32_t vgId);
int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) {
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
pRsp->blockData = taosArrayInit(0, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
@ -40,7 +40,7 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
}
static int32_t tqInitTaosxRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) {
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
tOffsetCopy(&pRsp->reqOffset, &pOffset);
tOffsetCopy(&pRsp->rspOffset, &pOffset);
@ -116,12 +116,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
SMqDataRsp dataRsp = {0};
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
code = tqInitDataRsp(&dataRsp.common, *pOffsetVal);
code = tqInitDataRsp(&dataRsp, *pOffsetVal);
if (code != 0) {
return code;
}
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.common.rspOffset.version);
pHandle->subKey, vgId, dataRsp.rspOffset.version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
tDeleteMqDataRsp(&dataRsp);
@ -145,24 +145,27 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
terrno = 0;
SMqDataRsp dataRsp = {0};
int code = tqInitDataRsp(&dataRsp.common, *pOffset);
int code = tqInitDataRsp(&dataRsp, *pOffset);
if (code != 0) {
goto end;
}
code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
if (code != 0) {
goto end;
}
code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
goto end;
}
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.common.blockNum == 0) {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
// lock
taosWLockLatch(&pTq->lock);
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (dataRsp.common.rspOffset.version > ver) { // check if there are data again to avoid lost data
if (dataRsp.rspOffset.version > ver) { // check if there are data again to avoid lost data
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock);
goto end;
@ -170,16 +173,16 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock);
}
tOffsetCopy(&dataRsp.common.reqOffset,
pOffset); // reqOffset represents the current date offset, may be changed if wal not exists
// reqOffset represents the current date offset, may be changed if wal not exists
tOffsetCopy(&dataRsp.reqOffset, pOffset);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : {
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.rspOffset);
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64
" code:%d",
consumerId, pHandle->subKey, vgId, dataRsp.common.blockNum, buf, pRequest->reqId, code);
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
tDeleteMqDataRsp(&dataRsp);
return code;
}
@ -208,11 +211,11 @@ static void tDeleteCommon(void* parm) {}
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) {
int32_t vgId = TD_VID(pTq->pVnode);
STaosxRsp taosxRsp = {0};
SMqDataRsp taosxRsp = {0};
SMqBatchMetaRsp btMetaRsp = {0};
int32_t code = 0;
TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp.common, *offset));
TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
if (offset->type != TMQ_OFFSET__LOG) {
TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset));
@ -227,13 +230,13 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.common.blockNum, taosxRsp.common.rspOffset.type,
taosxRsp.common.rspOffset.uid, taosxRsp.common.rspOffset.ts);
if (taosxRsp.common.blockNum > 0) {
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type,
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto END;
} else {
tOffsetCopy(offset, &taosxRsp.common.rspOffset);
tOffsetCopy(offset, &taosxRsp.rspOffset);
}
}
@ -264,7 +267,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
}
goto END;
}
tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer);
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
@ -278,7 +281,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
// process meta
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer);
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
@ -387,7 +390,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
}
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer + 1);
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
@ -522,7 +525,7 @@ int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPoll
return 0;
}
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const void* pRsp, int32_t epoch, int64_t consumerId,
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever) {
int32_t len = 0;
int32_t code = 0;

View File

@ -125,6 +125,8 @@ int32_t idxDebugFlag = 131;
int32_t sndDebugFlag = 131;
int32_t simDebugFlag = 131;
int32_t tqClientDebug = 0;
int64_t dbgEmptyW = 0;
int64_t dbgWN = 0;
int64_t dbgSmallWN = 0;

View File

@ -162,7 +162,6 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqOffset.py
@ -235,6 +234,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/basic5.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ts-4674.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-30270.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb1.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb2.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb3.py

View File

@ -266,7 +266,7 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
if totalConsumeRows < expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
@ -287,7 +287,7 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt*2:
if totalConsumeRows < expectrowcnt*2:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
tdLog.exit("tmq consume rows error!")

View File

@ -20,12 +20,9 @@ class actionType(Enum):
class TDTestCase:
hostname = socket.gethostname()
#rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict)
clientCfgDict = {'debugFlag': 135}
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
updatecfgDict["clientCfg"] = clientCfgDict
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)

View File

@ -0,0 +1,80 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from taos.tmq import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
clientCfgDict = {'debugFlag': 135}
updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict}
# updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict, 'tmqRowSize':1}
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def consume_test(self):
tdSql.execute(f'create database if not exists d1')
tdSql.execute(f'use d1')
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
tdSql.execute(f'create topic topic_all as select * from st')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_dict)
try:
consumer.unsubscribe()
consumer.unsubscribe()
consumer.subscribe(["topic_all"])
consumer.subscribe(["topic_all"])
except TmqError:
tdLog.exit(f"subscribe error")
cnt = 0
try:
while True:
res = consumer.poll(2)
if not res:
break
val = res.value()
if val is None:
print(f"null val")
continue
for block in val:
cnt += len(block.fetchall())
print(f"block {cnt} rows")
finally:
consumer.unsubscribe();
consumer.close()
def run(self):
self.consume_test()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -1,231 +0,0 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
clientCfgDict = {'debugFlag': 135}
updatecfgDict = {'debugFlag': 131, 'clientCfg':clientCfgDict}
def __init__(self):
self.vgroups = 3
self.ctbNum = 10
self.rowsPerTbl = 1000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 2,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replicaVar)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def updateRowsOfConsumer(self, consumerDict, consumerId, totalRowsOfConsumer):
for key in consumerDict:
if key == consumerId:
consumerDict[key] = totalRowsOfConsumer
return
consumerDict[consumerId] = totalRowsOfConsumer
return
def checkClientLog(self, actConsumeTotalRows, numOfConsumer):
# 01931245 TSC consumer:0x5ee20f124420000c process poll rsp, vgId:5, offset:log:3399, blocks:2, rows:6000 vg total:330000 total:654000, reqId:0xa77d2245ae20112
# 01931245 TSC consumer:0x5ee20f124420000c process poll rsp, vgId:7, offset:log:3384, blocks:1, rows:2000 vg total:326000 total:656000, reqId:0xa77d2245b050113
# 01931246 TSC consumer:0x5ee20f124420000d process poll rsp, vgId:6, offset:log:3400, blocks:2, rows:6000 vg total:330000 total:330000, reqId:0xa77d2245b380116
# 01931246 TSC consumer:0x5ee20f124420000d process poll rsp, vgId:6, offset:log:3460, blocks:2, rows:6000 vg total:336000 total:336000, reqId:0xa77d2245b8f011a
# 01931246 TSC consumer:0x5ee20f124420000d process poll rsp, vgId:6, offset:log:3520, blocks:2, rows:6000 vg total:342000 total:342000, reqId:0xa77d2245beb011f
# 01931246 TSC consumer:0x5ee20f124420000d process poll rsp, vgId:6, offset:log:3567, blocks:1, rows:2000 vg total:344000 total:344000, reqId:0xa77d2245c430121
# filter key: process poll rsp, vgId
tdLog.printNoPrefix("======== start filter key info from client log file")
cfgPath = tdCom.getClientCfgPath()
taosLogFile = '%s/../log/taoslog*'%(cfgPath)
filterResultFile = '%s/../log/filter'%(cfgPath)
cmdStr = 'grep -h "process poll rsp, vgId:" %s >> %s'%(taosLogFile, filterResultFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumerDict = {}
for index, line in enumerate(open(filterResultFile,'r')):
# tdLog.info("row[%d]: %s"%(index, line))
valueList = line.split(',')
# for i in range(len(valueList)):
# tdLog.info("index[%d]: %s"%(i, valueList[i]))
# get consumer id
list2 = valueList[0].split(':')
list3 = list2[3].split()
consumerId = list3[0]
print("consumerId: %s"%(consumerId))
# # get vg id
# list2 = valueList[1].split(':')
# vgId = list2[1]
# print("vgId: %s"%(vgId))
# get total rows of a certain consuer
list2 = valueList[6].split(':')
totalRowsOfConsumer = list2[1]
print("totalRowsOfConsumer: %s"%(totalRowsOfConsumer))
# update a certain info
self.updateRowsOfConsumer(consumerDict, consumerId, totalRowsOfConsumer)
# print(consumerDict)
if numOfConsumer != len(consumerDict):
tdLog.info("expect consumer num: %d, act consumer num: %d"%(numOfConsumer, len(consumerDict)))
tdLog.exit("act consumer error!")
# total rows of all consumers
totalRows = 0
for key in consumerDict:
totalRows += int(consumerDict[key])
if totalRows < actConsumeTotalRows:
tdLog.info("expect consume total rows: %d, act consume total rows: %d"%(actConsumeTotalRows, totalRows))
tdLog.exit("act consume rows error!")
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 2,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 0
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:500, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 1
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0] + resultList[1]
tdLog.info("two consumers poll rows: %d %d"%(resultList[0], resultList[1]))
tdLog.info("the consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
if not (totalRowsInserted <= actConsumeTotalRows):
tdLog.exit("%d tmq consume rows error!"%consumerId)
self.checkClientLog(actConsumeTotalRows, 2)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -198,7 +198,7 @@ class TDTestCase:
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
if expectrowcnt > resultList[0]:
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
@ -219,7 +219,7 @@ class TDTestCase:
actConsumeTotalRows = firstConsumeRows + resultList[0]
if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows):
if totalRowsInserted > actConsumeTotalRows:
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -87,7 +87,6 @@ python3 ./test.py -f 7-tmq/ins_topics_test.py
python3 ./test.py -f 7-tmq/tmqMaxTopic.py
python3 ./test.py -f 7-tmq/tmqParamsTest.py
python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
python3 ./test.py -f 7-tmq/tmqClientConsLog.py
python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
python3 ./test.py -f 7-tmq/tmqOffset.py