Merge pull request #24775 from taosdata/opti/TS-4480

opti:consume performance if all data is in format of "insert into using"
This commit is contained in:
dapan1121 2024-02-20 09:07:59 +08:00 committed by GitHub
commit 980e1b83be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 561 additions and 509 deletions

View File

@ -21,8 +21,8 @@
#include "tglobal.h"
#include "tmsgtype.h"
#define LOG_ID_TAG "connId:0x%"PRIx64",reqId:0x%"PRIx64
#define LOG_ID_VALUE *(int64_t*)taos,pRequest->requestId
#define LOG_ID_TAG "connId:0x%" PRIx64 ",reqId:0x%" PRIx64
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
@ -31,8 +31,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
uError("create json object failed")
return NULL;
uError("create json object failed") return NULL;
}
cJSON* type = cJSON_CreateString("create");
cJSON_AddItemToObject(json, "type", type);
@ -56,7 +55,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
cJSON_AddItemToObject(column, "name", cname);
cJSON* ctype = cJSON_CreateNumber(s->type);
cJSON_AddItemToObject(column, "type", ctype);
if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY|| s->type == TSDB_DATA_TYPE_GEOMETRY) {
if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(column, "length", cbytes);
@ -131,7 +130,8 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
cJSON* colType = cJSON_CreateNumber(field->type);
cJSON_AddItemToObject(json, "colType", colType);
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || field->type == TSDB_DATA_TYPE_GEOMETRY) {
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
field->type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = field->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
@ -156,7 +156,8 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
cJSON_AddItemToObject(json, "colName", colName);
cJSON* colType = cJSON_CreateNumber(field->type);
cJSON_AddItemToObject(json, "colType", colType);
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || field->type == TSDB_DATA_TYPE_GEOMETRY) {
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
field->type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = field->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
@ -182,7 +183,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
}
string = cJSON_PrintUnformatted(json);
end:
end:
cJSON_Delete(json);
tFreeSMAltertbReq(&req);
return string;
@ -295,9 +296,9 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
cJSON* tvalue = NULL;
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
char* buf = NULL;
if(pTagVal->type == TSDB_DATA_TYPE_VARBINARY){
buf = taosMemoryCalloc(pTagVal->nData*2 + 2 + 3, 1);
}else{
if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
buf = taosMemoryCalloc(pTagVal->nData * 2 + 2 + 3, 1);
} else {
buf = taosMemoryCalloc(pTagVal->nData + 3, 1);
}
@ -315,7 +316,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
cJSON_AddItemToArray(tags, tag);
}
end:
end:
cJSON_AddItemToObject(json, "tags", tags);
taosArrayDestroy(pTagVals);
}
@ -469,7 +470,8 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
cJSON_AddItemToObject(json, "colType", colType);
if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY || vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
@ -490,7 +492,8 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
cJSON_AddItemToObject(json, "colName", colName);
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
cJSON_AddItemToObject(json, "colType", colType);
if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
@ -527,9 +530,9 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
}
buf = parseTagDatatoJson(vAlterTbReq.pTagVal);
} else {
if(vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY){
buf = taosMemoryCalloc(vAlterTbReq.nTagVal*2 + 2 + 3, 1);
}else{
if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) {
buf = taosMemoryCalloc(vAlterTbReq.nTagVal * 2 + 2 + 3, 1);
} else {
buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 3, 1);
}
dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
@ -677,7 +680,7 @@ _exit:
}
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
if(taos == NULL || meta == NULL) {
if (taos == NULL || meta == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
@ -692,7 +695,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
terrno = code;
return code;
}
uDebug(LOG_ID_TAG" create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
@ -731,8 +734,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.source = TD_REQ_FROM_TAOX;
pReq.igExists = true;
uDebug(LOG_ID_TAG" create stable name:%s suid:%" PRId64 " processSuid:%" PRId64,
LOG_ID_VALUE, req.name, req.suid, pReq.suid);
uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
pReq.suid);
STscObj* pTscObj = pRequest->pTscObj;
SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
@ -766,7 +769,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
taosMemoryFree(pCmdMsg.pMsg);
end:
uDebug(LOG_ID_TAG" create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq);
tDecoderClear(&coder);
@ -775,7 +778,7 @@ end:
}
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
if(taos == NULL || meta == NULL) {
if (taos == NULL || meta == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
@ -791,7 +794,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
return code;
}
uDebug(LOG_ID_TAG" drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
@ -812,9 +815,9 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
goto end;
}
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
SName pName = {0};
toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
STableMeta* pTableMeta = NULL;
@ -835,8 +838,8 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.source = TD_REQ_FROM_TAOX;
// pReq.suid = processSuid(req.suid, pRequest->pDb);
uDebug(LOG_ID_TAG" drop stable name:%s suid:%" PRId64 " new suid:%" PRId64,
LOG_ID_VALUE, req.name, req.suid, pReq.suid);
uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
pReq.suid);
STscObj* pTscObj = pRequest->pTscObj;
SName tableName = {0};
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
@ -870,7 +873,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
taosMemoryFree(pCmdMsg.pMsg);
end:
uDebug(LOG_ID_TAG" drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
destroyRequest(pRequest);
tDecoderClear(&coder);
terrno = code;
@ -889,7 +892,7 @@ static void destroyCreateTbReqBatch(void* data) {
}
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
if(taos == NULL || meta == NULL) {
if (taos == NULL || meta == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
@ -939,9 +942,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table
@ -1033,7 +1036,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code;
end:
uDebug(LOG_ID_TAG" create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
@ -1062,7 +1065,7 @@ static void destroyDropTbReqBatch(void* data) {
}
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
if(taos == NULL || meta == NULL) {
if (taos == NULL || meta == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
@ -1111,9 +1114,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
@ -1142,7 +1145,8 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
tb_uid_t oldSuid = pDropReq->suid;
pDropReq->suid = pTableMeta->suid;
taosMemoryFreeClear(pTableMeta);
uDebug(LOG_ID_TAG" drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid, pDropReq->suid);
uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
pDropReq->suid);
taosArrayPush(pRequest->tableList, &pName);
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
@ -1185,7 +1189,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code;
end:
uDebug(LOG_ID_TAG" drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest);
tDecoderClear(&coder);
@ -1227,16 +1231,16 @@ end:
//}
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
if(taos == NULL || meta == NULL) {
if (taos == NULL || meta == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
SDeleteRes req = {0};
SDecoder coder = {0};
SDeleteRes req = {0};
SDecoder coder = {0};
char sql[256] = {0};
int32_t code = TSDB_CODE_SUCCESS;
uDebug("connId:0x%"PRIx64" delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
@ -1260,14 +1264,14 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
taos_free_result(res);
end:
uDebug("connId:0x%"PRIx64" delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
tDecoderClear(&coder);
terrno = code;
return code;
}
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
if(taos == NULL || meta == NULL) {
if (taos == NULL || meta == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
@ -1312,9 +1316,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
}
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
SVgroupInfo pInfo = {0};
SName pName = {0};
@ -1394,8 +1398,8 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
}
int taos_write_raw_block_with_fields_with_reqid(TAOS *taos, int rows, char *pData, const char *tbname,
TAOS_FIELD *fields, int numFields, int64_t reqid){
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
TAOS_FIELD* fields, int numFields, int64_t reqid) {
if (!taos || !pData || !tbname) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
@ -1410,8 +1414,8 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS *taos, int rows, char *pDat
return terrno;
}
uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d",
LOG_ID_VALUE, rows, pData, tbname, fields, numFields);
uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
rows, pData, tbname, fields, numFields);
pRequest->syncQuery = true;
if (!pRequest->pDb) {
@ -1562,12 +1566,12 @@ end:
return code;
}
static void* getRawDataFromRes(void *pRetrieve){
static void* getRawDataFromRes(void* pRetrieve) {
void* rawData = NULL;
// deal with compatibility
if(*(int64_t*)pRetrieve == 0){
if (*(int64_t*)pRetrieve == 0) {
rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
}else if(*(int64_t*)pRetrieve == 1){
} else if (*(int64_t*)pRetrieve == 1) {
rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
}
ASSERT(rawData != NULL);
@ -1575,7 +1579,7 @@ static void* getRawDataFromRes(void *pRetrieve){
}
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
if(taos == NULL || data == NULL){
if (taos == NULL || data == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
@ -1644,11 +1648,11 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
strcpy(pName.tname, tbName);
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
// if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
// uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
// code = TSDB_CODE_SUCCESS;
// continue;
// }
// if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
// uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
// code = TSDB_CODE_SUCCESS;
// continue;
// }
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@ -1704,7 +1708,7 @@ end:
}
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
if(taos == NULL || data == NULL){
if (taos == NULL || data == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
@ -1757,7 +1761,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
uDebug(LOG_ID_TAG" write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.blockNum);
uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.blockNum);
while (++rspObj.resIter < rspObj.rsp.blockNum) {
void* pRetrieve = taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
if (!rspObj.rsp.withSchema) {
@ -1770,7 +1774,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end;
}
uDebug(LOG_ID_TAG" write raw metadata block tbname:%s", LOG_ID_VALUE, tbName);
uDebug(LOG_ID_TAG " write raw metadata block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbName);
@ -1817,11 +1821,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
strcpy(pName.tname, pCreateReqDst->ctb.stbName);
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
// if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
// uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
// code = TSDB_CODE_SUCCESS;
// continue;
// }
// if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
// uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
// code = TSDB_CODE_SUCCESS;
// continue;
// }
if (code != TSDB_CODE_SUCCESS) {
goto end;
}

File diff suppressed because it is too large Load Diff

View File

@ -15,20 +15,20 @@
#define _DEFAULT_SOURCE
#include "mndConsumer.h"
#include "mndPrivilege.h"
#include "mndVgroup.h"
#include "mndShow.h"
#include "mndDb.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
#define MND_CONSUMER_VER_NUMBER 2
#define MND_CONSUMER_RESERVE_SIZE 64
#define MND_MAX_GROUP_PER_TOPIC 100
#define MND_MAX_GROUP_PER_TOPIC 100
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
@ -56,7 +56,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
@ -68,10 +68,11 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {}
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info){
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo *info) {
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
if (pClearMsg == NULL) {
mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
mError("consumer:0x%" PRIx64 " failed to clear consumer due to out of memory. alloc size:%d", consumerId,
(int32_t)sizeof(SMqConsumerClearMsg));
return;
}
@ -88,13 +89,14 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo*
return;
}
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) {
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser,
bool enableReplay) {
SMqTopicObj *pTopic = NULL;
int32_t code = 0;
int32_t code = 0;
int32_t numOfTopics = taosArrayGetSize(pTopicList);
for (int32_t i = 0; i < numOfTopics; i++) {
char *pOneTopic = taosArrayGetP(pTopicList, i);
char *pOneTopic = taosArrayGetP(pTopicList, i);
pTopic = mndAcquireTopic(pMnode, pOneTopic);
if (pTopic == NULL) { // terrno has been set by callee function
code = -1;
@ -112,11 +114,11 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
goto FAILED;
}
if(enableReplay){
if(pTopic->subType != TOPIC_SUB_TYPE__COLUMN){
if (enableReplay) {
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
goto FAILED;
}else if(pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
} else if (pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
if (pDb == NULL) {
code = -1;
@ -132,7 +134,7 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
}
mndTransSetDbName(pTrans, pOneTopic, NULL);
if(mndTransCheckConflict(pMnode, pTrans) != 0){
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
code = -1;
goto FAILED;
}
@ -172,7 +174,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
if (pTrans == NULL) {
goto FAIL;
}
if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false) != 0){
if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false) != 0) {
goto FAIL;
}
@ -197,7 +199,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
if (pConsumer == NULL) {
mError("consumer:0x%"PRIx64" failed to be found to clear it", pClearMsg->consumerId);
mError("consumer:0x%" PRIx64 " failed to be found to clear it", pClearMsg->consumerId);
return 0;
}
@ -226,15 +228,15 @@ FAIL:
return -1;
}
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char* user){
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
if(rsp->topicPrivileges == NULL){
if (rsp->topicPrivileges == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
for(int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++){
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
SMqTopicObj* pTopic = mndAcquireTopic(pMnode, topic);
for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic == NULL) { // terrno has been set by callee function
continue;
}
@ -252,10 +254,10 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbR
}
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int32_t code = 0;
SMnode *pMnode = pMsg->info.node;
SMqHbReq req = {0};
SMqHbRsp rsp = {0};
int32_t code = 0;
SMnode *pMnode = pMsg->info.node;
SMqHbReq req = {0};
SMqHbRsp rsp = {0};
SMqConsumerObj *pConsumer = NULL;
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
@ -264,7 +266,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
goto end;
}
int64_t consumerId = req.consumerId;
int64_t consumerId = req.consumerId;
pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
mError("consumer:0x%" PRIx64 " not exist", consumerId);
@ -273,7 +275,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
goto end;
}
code = checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user);
if(code != 0){
if (code != 0) {
goto end;
}
@ -295,12 +297,12 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
}
for(int i = 0; i < taosArrayGetSize(req.topics); i++){
TopicOffsetRows* data = taosArrayGet(req.topics, i);
for (int i = 0; i < taosArrayGetSize(req.topics); i++) {
TopicOffsetRows *data = taosArrayGet(req.topics, i);
mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){
if (pSub == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
@ -308,7 +310,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
}
taosWLockLatch(&pSub->lock);
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
if(pConsumerEp){
if (pConsumerEp) {
taosArrayDestroy(pConsumerEp->offsetRows);
pConsumerEp->offsetRows = data->offsetRows;
data->offsetRows = NULL;
@ -413,7 +415,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
// txn guarantees pSub is created
if(pSub == NULL) {
if (pSub == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
@ -426,7 +428,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 2.1 fetch topic schema
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if(pTopic == NULL) {
if (pTopic == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
@ -460,12 +462,12 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
for (int32_t j = 0; j < vgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
// char offsetKey[TSDB_PARTITION_KEY_LEN];
// mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
// char offsetKey[TSDB_PARTITION_KEY_LEN];
// mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
if(epoch == -1){
if (epoch == -1) {
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
if(pVgroup){
if (pVgroup) {
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
mndReleaseVgroup(pMnode, pVgroup);
}
@ -495,7 +497,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
goto FAIL;
}
SMqRspHead* pHead = buf;
SMqRspHead *pHead = buf;
pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
pHead->epoch = serverEpoch;
@ -503,7 +505,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
pHead->walsver = 0;
pHead->walever = 0;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqAskEpRsp(&abuf, &rsp);
@ -555,21 +556,20 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SCMSubscribeReq subscribe = {0};
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
int64_t consumerId = subscribe.consumerId;
int64_t consumerId = subscribe.consumerId;
char *cgroup = subscribe.cgroup;
SMqConsumerObj *pExistedConsumer = NULL;
SMqConsumerObj *pConsumerNew = NULL;
STrans *pTrans = NULL;
STrans *pTrans = NULL;
SArray *pTopicList = subscribe.topicNames;
taosArraySort(pTopicList, taosArrayCompareString);
taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
int32_t newTopicNum = taosArrayGetSize(pTopicList);
for(int i = 0; i < newTopicNum; i++){
int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char*)taosArrayGetP(pTopicList, i));
if(gNum >= MND_MAX_GROUP_PER_TOPIC){
for (int i = 0; i < newTopicNum; i++) {
int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
code = terrno;
goto _over;
@ -600,7 +600,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval;
pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg;
// pConsumerNew->updateType = CONSUMER_UPDATE_SUB; // use insert logic
// pConsumerNew->updateType = CONSUMER_UPDATE_SUB; // use insert logic
taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
@ -787,16 +787,15 @@ CM_DECODE_OVER:
}
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d",
pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
pConsumer->epoch);
mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
pConsumer->subscribeTime = pConsumer->createTime;
return 0;
}
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
mndConsumerStatusName(pConsumer->status));
mndConsumerStatusName(pConsumer->status));
tDeleteSMqConsumerObj(pConsumer, false);
return 0;
}
@ -823,7 +822,7 @@ static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic
taosMemoryFree(p);
mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics));
pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics));
break;
}
}
@ -839,7 +838,7 @@ static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTo
taosMemoryFree(p);
mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
break;
}
}
@ -854,13 +853,13 @@ static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pT
taosMemoryFree(topic);
mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
break;
}
}
}
static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) {
static bool existInCurrentTopicList(const SMqConsumerObj *pConsumer, const char *pTopic) {
bool existing = false;
int32_t size = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < size; i++) {
@ -877,7 +876,7 @@ static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char*
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64,
pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime);
taosWLockLatch(&pOldConsumer->lock);
@ -888,19 +887,21 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->subscribeTime = taosGetTimestampMs();
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer",pOldConsumer->consumerId);
// } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) {
// int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
// for (int32_t i = 0; i < sz; i++) {
// char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
// taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
// }
//
// int32_t prevStatus = pOldConsumer->status;
// pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
// mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
// pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
// pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer", pOldConsumer->consumerId);
// } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) {
// int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
// for (int32_t i = 0; i < sz; i++) {
// char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
// taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
// }
//
// int32_t prevStatus = pOldConsumer->status;
// pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
// mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ",
// reb-removed-topics:%d",
// pOldConsumer->consumerId, mndConsumerStatusName(prevStatus),
// mndConsumerStatusName(pOldConsumer->status), pOldConsumer->rebalanceTime,
// (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
@ -909,7 +910,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
}
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " timer update, timer recover",pOldConsumer->consumerId);
mInfo("consumer:0x%" PRIx64 " timer update, timer recover", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
@ -940,11 +941,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
(int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
(int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
@ -963,11 +964,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
(int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
(int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
}
taosWUnLockLatch(&pOldConsumer->lock);
@ -1025,8 +1026,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
int32_t cols = 0;
// consumer id
char consumerIdHex[32] = {0};
sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumer->consumerId);
char consumerIdHex[32] = {0};
sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, pConsumer->consumerId);
varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -1081,12 +1082,13 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
char buf[TSDB_OFFSET_LEN] = {0};
char buf[TSDB_OFFSET_LEN] = {0};
STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);

View File

@ -14,8 +14,8 @@
*/
#include "tq.h"
#include "vnd.h"
#include "tqCommon.h"
#include "vnd.h"
// 0: not init
// 1: already inited
@ -865,9 +865,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
return 0;
}
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
@ -991,7 +989,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamReExecScanHistoryFuture(pTask, retInfo.idleTime);
} else {
SStreamTaskState* p = streamTaskGetStatus(pTask);
ETaskStatus s = p->state;
ETaskStatus s = p->state;
if (s == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr,
@ -1065,7 +1063,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessDispatchReq(pTq->pStreamMeta, pMsg);
return tqStreamTaskProcessDispatchReq(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
@ -1104,7 +1102,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
pRsp->info.handle = NULL;
SStreamCheckpointSourceReq req = {0};
SDecoder decoder;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
@ -1195,8 +1193,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamProcessCheckpointSourceReq(pTask, &req);
taosThreadMutexUnlock(&pTask->lock);
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d",
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d", pTask->id.idStr,
vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -74,11 +74,11 @@ int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
}
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
void* pKey = NULL;
void* pVal = NULL;
int32_t kLen = 0;
int32_t vLen = 0;
int32_t code = 0;
void* pKey = NULL;
void* pVal = NULL;
int32_t kLen = 0;
int32_t vLen = 0;
if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
goto _exit;

View File

@ -207,8 +207,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t
goto END;
}
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64" 0x%"PRIx64, vgId,
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64 " 0x%" PRIx64,
vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader);
@ -303,7 +303,7 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
int32_t code = 0;
while(1) {
while (1) {
code = walNextValidMsg(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -322,7 +322,8 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
// retry
code = TSDB_CODE_OUT_OF_MEMORY;
terrno = code;
@ -369,7 +370,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
// todo ignore the error in wal?
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
SWalReader* pWalReader = pReader->pWalReader;
SWalReader* pWalReader = pReader->pWalReader;
SSDataBlock* pDataBlock = NULL;
uint64_t st = taosGetTimestampMs();
@ -387,22 +388,22 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
pReader->nextBlk = 0;
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk,
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver);
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
pReader->msg.ver);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if ((pSubmitTbData->source & sourceExcluded) != 0){
if ((pSubmitTbData->source & sourceExcluded) != 0) {
pReader->nextBlk += 1;
continue;
}
if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
SSDataBlock* pRes = NULL;
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
if(pDataBlock == NULL){
if (pDataBlock == NULL) {
pDataBlock = createOneDataBlock(pRes, true);
}else{
} else {
blockDataMerge(pDataBlock, pRes);
}
}
@ -414,16 +415,16 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->msg.msgStr = NULL;
if(pDataBlock != NULL){
if (pDataBlock != NULL) {
blockDataCleanup(pReader->pResBlock);
copyDataBlock(pReader->pResBlock, pDataBlock);
blockDataDestroy(pDataBlock);
return true;
}else{
} else {
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
}
if(taosGetTimestampMs() - st > 1000){
if (taosGetTimestampMs() - st > 1000) {
return false;
}
}
@ -448,17 +449,11 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
return 0;
}
SWalReader* tqGetWalReader(STqReader* pReader) {
return pReader->pWalReader;
}
SWalReader* tqGetWalReader(STqReader* pReader) { return pReader->pWalReader; }
SSDataBlock* tqGetResultBlock (STqReader* pReader) {
return pReader->pResBlock;
}
SSDataBlock* tqGetResultBlock(STqReader* pReader) { return pReader->pResBlock; }
int64_t tqGetResultBlockTime(STqReader *pReader){
return pReader->lastTs;
}
int64_t tqGetResultBlockTime(STqReader* pReader) { return pReader->lastTs; }
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
if (pReader->msg.msgStr == NULL) {
@ -599,7 +594,7 @@ static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SCol
if (IS_STR_DATA_TYPE(pColVal->type)) {
char val[65535 + 2] = {0};
if(COL_VAL_IS_VALUE(pColVal)){
if (COL_VAL_IS_VALUE(pColVal)) {
if (pColVal->value.pData != NULL) {
memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
}
@ -874,7 +869,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
sourceIdx++;
} else if (pCol->cid == pColData->info.colId) {
tColDataGetValue(pCol, i, &colVal);
if(doSetVal(pColData, curRow - lastRow, &colVal) != TDB_CODE_SUCCESS){
if (doSetVal(pColData, curRow - lastRow, &colVal) != TDB_CODE_SUCCESS) {
goto FAIL;
}
sourceIdx++;
@ -958,10 +953,10 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
if (colVal.cid < pColData->info.colId) {
sourceIdx++;
} else if (colVal.cid == pColData->info.colId) {
if(doSetVal(pColData, curRow - lastRow, &colVal) != TDB_CODE_SUCCESS){
if (doSetVal(pColData, curRow - lastRow, &colVal) != TDB_CODE_SUCCESS) {
goto FAIL;
}
sourceIdx++;
sourceIdx++;
targetIdx++;
}
}
@ -1001,7 +996,7 @@ int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char
taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
}
tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t) taosArrayGetSize(tbUidList));
tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
return 0;
}
@ -1027,9 +1022,7 @@ bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t));
}
bool tqCurrentBlockConsumed(const STqReader* pReader) {
return pReader->msg.msgStr == NULL;
}
bool tqCurrentBlockConsumed(const STqReader* pReader) { return pReader->msg.msgStr == NULL; }
int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
@ -1071,9 +1064,11 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
} else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (isAdd) {
SArray* list = NULL;
int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list, pTqHandle->execHandle.task);
if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
&list, pTqHandle->execHandle.task);
if (ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
pTqHandle->consumerId);
taosArrayDestroy(list);
taosHashCancelIterate(pTq->pHandle, pIter);
taosWUnLockLatch(&pTq->lock);

View File

@ -63,8 +63,8 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
return 0;
}
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res){
uint64_t ts = 0;
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
uint64_t ts = 0;
qStreamSetOpen(task);
tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
@ -97,27 +97,27 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
while (1) {
SSDataBlock* pDataBlock = NULL;
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
if (code != 0){
if (code != 0) {
return code;
}
if(pRequest->enableReplay){
if(IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL){
if (pRequest->enableReplay) {
if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
blockDataDestroy(pHandle->block);
pHandle->block = NULL;
}
if(pHandle->block == NULL){
if (pHandle->block == NULL) {
if (pDataBlock == NULL) {
break;
}
STqOffsetVal offset = {0};
qStreamExtractOffset(task, &offset);
pHandle->block = createOneDataBlock(pDataBlock, true);
// pHandle->block = createDataBlock();
// copyDataBlock(pHandle->block, pDataBlock);
// pHandle->block = createDataBlock();
// copyDataBlock(pHandle->block, pDataBlock);
pHandle->blockTime = offset.ts;
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
if (code != 0){
if (code != 0) {
return code;
}
}
@ -132,7 +132,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
if (pDataBlock == NULL) {
blockDataDestroy(pHandle->block);
pHandle->block = NULL;
}else{
} else {
copyDataBlock(pHandle->block, pDataBlock);
STqOffsetVal offset = {0};
@ -141,7 +141,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
pHandle->blockTime = offset.ts;
}
break;
}else{
} else {
if (pDataBlock == NULL) {
break;
}
@ -250,7 +250,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
return 0;
}
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) {
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows,
int8_t sourceExcluded) {
STqExecHandle* pExec = &pHandle->execHandle;
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
SArray* pSchemas = taosArrayInit(0, sizeof(void*));
@ -266,7 +267,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
}
if ((pSubmitTbDataRet->source & sourceExcluded) != 0){
if ((pSubmitTbDataRet->source & sourceExcluded) != 0) {
goto loop_table;
}
if (pRsp->withTbName) {
@ -303,7 +304,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder);
}
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) {
goto loop_table;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
@ -334,7 +335,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
}
if ((pSubmitTbDataRet->source & sourceExcluded) != 0){
if ((pSubmitTbDataRet->source & sourceExcluded) != 0) {
goto loop_db;
}
if (pRsp->withTbName) {
@ -371,7 +372,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder);
}
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) {
goto loop_db;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {

View File

@ -72,10 +72,11 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
return 0;
}
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) {
uint64_t consumerId = pRequest->consumerId;
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
int32_t vgId = TD_VID(pTq->pVnode);
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, bool* pBlockReturned) {
uint64_t consumerId = pRequest->consumerId;
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
int32_t vgId = TD_VID(pTq->pVnode);
*pBlockReturned = false;
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
@ -132,7 +133,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
terrno = 0;
terrno = 0;
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, *pOffset);
@ -156,7 +157,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock);
}
dataRsp.reqOffset = *pOffset; // reqOffset represents the current date offset, may be changed if wal not exists
dataRsp.reqOffset = *pOffset; // reqOffset represents the current date offset, may be changed if wal not exists
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : {
@ -167,15 +168,15 @@ end : {
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
tDeleteMqDataRsp(&dataRsp);
return code;
}
}
}
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) {
int code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
int code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, *offset);
if (offset->type != TMQ_OFFSET__LOG) {
@ -211,14 +212,16 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
int64_t fetchVer = offset->version;
uint64_t st = taosGetTimestampMs();
int totalRows = 0;
int totalRows = 0;
while (1) {
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
ASSERT(savedEpoch <= pRequest->epoch);
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
@ -230,7 +233,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
@ -257,9 +262,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
goto end;
}
if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 1000)) {
if (totalRows >= 4096 || (taosGetTimestampMs() - st > 1000)) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
} else {
fetchVer++;
@ -279,7 +286,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
// 1. reset the offset if needed
if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
// handle the reset offset cases, according to the consumer's choice.
bool blockReturned = false;
bool blockReturned = false;
int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) {
return code;
@ -289,7 +296,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
if (blockReturned) {
return 0;
}
} else if(reqOffset.type == 0){ // use the consumer specified offset
} else if (reqOffset.type == 0) { // use the consumer specified offset
uError("req offset type is 0");
return TSDB_CODE_TMQ_INVALID_MSG;
}
@ -452,8 +459,8 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void*
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
int32_t code = TSDB_CODE_SUCCESS;
if (pDelay != NULL) {
*pDelay = 0;

View File

@ -311,44 +311,57 @@ class TDTestCase:
return
def consumeExcluded(self):
tdSql.execute(f'create topic topic_excluded as database db_taosx')
def consumeTest(self):
tdSql.execute(f'create database if not exists d1 vgroups 1')
tdSql.execute(f'use d1')
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t1 using st tags(1) values(now+5s, 11) (now+10s, 12)')
tdSql.query("select * from st")
tdSql.checkRows(8)
tdSql.execute(f'create topic topic_excluded with meta as database d1')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"msg.consume.excluded": 1
}
consumer = Consumer(consumer_dict)
tdLog.debug("test subscribe topic created by other user")
exceptOccured = False
try:
consumer.subscribe(["topic_excluded"])
except TmqError:
exceptOccured = True
if exceptOccured:
tdLog.exit(f"subscribe error")
index = 0
try:
while True:
res = consumer.poll(1)
if not res:
if index != 1:
tdLog.exit("consume error")
break
err = res.error()
if err is not None:
raise err
val = res.value()
if val is None:
continue
cnt = 0;
for block in val:
print(block.fetchall())
cnt += len(block.fetchall())
if cnt != 8:
tdLog.exit("consume error")
index += 1
finally:
consumer.close()
def run(self):
self.consumeTest()
tdSql.prepare()
self.checkWal1VgroupOnlyMeta()
@ -362,7 +375,6 @@ class TDTestCase:
self.checkSnapshotMultiVgroups()
self.checkWalMultiVgroupsWithDropTable()
# self.consumeExcluded()
self.checkSnapshotMultiVgroupsWithDropTable()