Merge pull request #28300 from taosdata/enh/TS-5441-3.0

enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info
This commit is contained in:
Shengliang Guan 2024-10-30 13:43:41 +08:00 committed by GitHub
commit ea9ed2e538
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 604 additions and 416 deletions

View File

@ -4104,7 +4104,6 @@ void tDeleteMqMetaRsp(SMqMetaRsp* pRsp);
#define MQ_DATA_RSP_VERSION 100 #define MQ_DATA_RSP_VERSION 100
typedef struct { typedef struct {
struct {
SMqRspHead head; SMqRspHead head;
STqOffsetVal rspOffset; STqOffsetVal rspOffset;
STqOffsetVal reqOffset; STqOffsetVal reqOffset;
@ -4115,7 +4114,6 @@ typedef struct {
SArray* blockData; SArray* blockData;
SArray* blockTbName; SArray* blockTbName;
SArray* blockSchema; SArray* blockSchema;
};
union{ union{
struct{ struct{

View File

@ -176,8 +176,8 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
char* msgBuf, int32_t msgBufLen); char* msgBuf, int32_t msgBufLen);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* fields, int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* fields,
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen); int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut); int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut);

View File

@ -70,7 +70,7 @@ extern int32_t tdbDebugFlag;
extern int32_t sndDebugFlag; extern int32_t sndDebugFlag;
extern int32_t simDebugFlag; extern int32_t simDebugFlag;
extern int32_t tqClientDebug; extern int32_t tqClientDebugFlag;
int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc); int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc);
void taosCloseLog(); void taosCloseLog();

View File

@ -53,9 +53,7 @@
#define TMQ_META_VERSION "1.0" #define TMQ_META_VERSION "1.0"
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen); static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t, static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
SColCmprWrapper* pColCmprRow, cJSON** pJson) { SColCmprWrapper* pColCmprRow, cJSON** pJson) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -890,9 +888,6 @@ end:
} }
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
if (taos == NULL || meta == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder coder; SDecoder coder;
SMCreateStbReq pReq = {0}; SMCreateStbReq pReq = {0};
@ -1003,9 +998,6 @@ end:
} }
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
if (taos == NULL || meta == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SVDropStbReq req = {0}; SVDropStbReq req = {0};
SDecoder coder = {0}; SDecoder coder = {0};
SMDropStbReq pReq = {0}; SMDropStbReq pReq = {0};
@ -1115,9 +1107,6 @@ static void destroyCreateTbReqBatch(void* data) {
} }
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
if (taos == NULL || meta == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SVCreateTbBatchReq req = {0}; SVCreateTbBatchReq req = {0};
SDecoder coder = {0}; SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1304,9 +1293,6 @@ static void destroyDropTbReqBatch(void* data) {
} }
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
if (taos == NULL || meta == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SVDropTbBatchReq req = {0}; SVDropTbBatchReq req = {0};
SDecoder coder = {0}; SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1419,9 +1405,6 @@ end:
} }
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
if (taos == NULL || meta == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SDeleteRes req = {0}; SDeleteRes req = {0};
SDecoder coder = {0}; SDecoder coder = {0};
char sql[256] = {0}; char sql[256] = {0};
@ -1457,9 +1440,6 @@ end:
} }
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
if (taos == NULL || meta == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SVAlterTbReq req = {0}; SVAlterTbReq req = {0};
SDecoder dcoder = {0}; SDecoder dcoder = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1622,7 +1602,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat
RAW_NULL_CHECK(pVgHash); RAW_NULL_CHECK(pVgHash);
RAW_RETURN_CHECK( RAW_RETURN_CHECK(
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData))); taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0)); RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
@ -1682,7 +1662,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha
RAW_NULL_CHECK(pVgHash); RAW_NULL_CHECK(pVgHash);
RAW_RETURN_CHECK( RAW_RETURN_CHECK(
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData))); taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0)); RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
@ -1708,116 +1688,6 @@ static void* getRawDataFromRes(void* pRetrieve) {
return rawData; return rawData;
} }
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
if (taos == NULL || data == NULL) {
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pVgHash = NULL;
SQuery* pQuery = NULL;
SMqRspObj rspObj = {0};
SDecoder decoder = {0};
STableMeta* pTableMeta = NULL;
SRequestObj* pRequest = NULL;
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest));
uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
pRequest->syncQuery = true;
rspObj.resIter = -1;
rspObj.resType = RES_TYPE__TMQ;
int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION) {
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
dataLen -= sizeof(int8_t) + sizeof(int32_t);
}
tDecoderInit(&decoder, data, dataLen);
code = tDecodeMqDataRsp(&decoder, &rspObj.dataRsp);
if (code != 0) {
SET_ERROR_MSG("decode mq data rsp failed");
code = TSDB_CODE_INVALID_MSG;
goto end;
}
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
struct SCatalog* pCatalog = NULL;
RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
SRequestConnInfo conn = {0};
conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
conn.requestId = pRequest->requestId;
conn.requestObjRefId = pRequest->self;
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pVgHash);
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
RAW_NULL_CHECK(pRetrieve);
if (!rspObj.dataRsp.withSchema) {
goto end;
}
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
RAW_NULL_CHECK(tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
SVgroupInfo vg = {0};
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg));
void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId));
if (hData == NULL) {
RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
}
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
RAW_NULL_CHECK(pSW);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
RAW_NULL_CHECK(fields);
for (int i = 0; i < pSW->nCols; i++) {
fields[i].type = pSW->pSchema[i].type;
fields[i].bytes = pSW->pSchema[i].bytes;
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
}
void* rawData = getRawDataFromRes(pRetrieve);
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true, err, ERR_MSG_LEN);
taosMemoryFree(fields);
taosMemoryFreeClear(pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
goto end;
}
}
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
end:
uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
tDeleteMqDataRsp(&rspObj.dataRsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
taosHashCleanup(pVgHash);
taosMemoryFreeClear(pTableMeta);
return code;
}
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) { static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
// find schema data info // find schema data info
int32_t code = 0; int32_t code = 0;
@ -1855,152 +1725,368 @@ end:
return code; return code;
} }
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { typedef enum {
if (taos == NULL || data == NULL) { WRITE_RAW_INIT_START = 0,
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data); WRITE_RAW_INIT_OK,
return TSDB_CODE_INVALID_PARA; WRITE_RAW_INIT_FAIL,
} WRITE_RAW_INIT_STATUS;
static SHashObj* writeRawCache = NULL;
static int8_t initFlag = 0;
static int8_t initedFlag = WRITE_RAW_INIT_START;
typedef struct {
SHashObj* pVgHash;
SHashObj* pNameHash;
SHashObj* pMetaHash;
} rawCacheInfo;
typedef struct {
SVgroupInfo vgInfo;
int64_t uid;
int64_t suid;
} tbInfo;
static void tmqFreeMeta(void* data) {
STableMeta* pTableMeta = *(STableMeta**)data;
taosMemoryFree(pTableMeta);
}
static void freeRawCache(void* data) {
rawCacheInfo* pRawCache = (rawCacheInfo*)data;
taosHashCleanup(pRawCache->pMetaHash);
taosHashCleanup(pRawCache->pNameHash);
taosHashCleanup(pRawCache->pVgHash);
}
static int32_t initRawCacheHash() {
if (writeRawCache == NULL) {
writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (writeRawCache == NULL) {
return terrno;
} }
int32_t code = TSDB_CODE_SUCCESS; taosHashSetFreeFp(writeRawCache, freeRawCache);
SHashObj* pVgHash = NULL; }
SQuery* pQuery = NULL; return 0;
SMqRspObj rspObj = {0}; }
SDecoder decoder = {0};
STableMeta* pTableMeta = NULL;
SHashObj* pCreateTbHash = NULL;
SRequestObj* pRequest = NULL; static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest)); char* p = (char*)rawData;
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
// column length |
p += sizeof(int32_t);
p += sizeof(int32_t);
p += sizeof(int32_t);
p += sizeof(int32_t);
p += sizeof(int32_t);
p += sizeof(uint64_t);
int8_t* fields = p;
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
pRequest->syncQuery = true; return true;
rspObj.resIter = -1; }
rspObj.resType = RES_TYPE__TMQ_METADATA; for (int i = 0; i < pSW->nCols; i++) {
int j = 0;
for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
SSchema* pColSchema = &pTableMeta->schema[j];
char* fieldName = pSW->pSchema[i].name;
if (strcmp(pColSchema->name, fieldName) == 0) {
if (*fields != pColSchema->type || *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
return true;
}
break;
}
}
fields += sizeof(int8_t) + sizeof(int32_t);
if (j == pTableMeta->tableInfo.numOfColumns) return true;
}
return false;
}
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
int32_t code = 0;
void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
if (cacheInfo == NULL) {
*pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(*pVgHash);
*pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
RAW_NULL_CHECK(*pNameHash);
*pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(*pMetaHash);
taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
} else {
rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
*pVgHash = info->pVgHash;
*pNameHash = info->pNameHash;
*pMetaHash = info->pMetaHash;
}
return 0;
end:
taosHashCleanup(*pMetaHash);
taosHashCleanup(*pNameHash);
taosHashCleanup(*pVgHash);
return code;
}
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
int32_t code = 0;
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, pRequest));
(*pRequest)->syncQuery = true;
if (!(*pRequest)->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
conn->requestId = (*pRequest)->requestId;
conn->requestObjRefId = (*pRequest)->self;
conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
end:
return code;
}
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
static int32_t decodeRawData(SDecoder* decoder, void* data, int32_t dataLen, _raw_decode_func_ func,
SMqRspObj* rspObj) {
int8_t dataVersion = *(int8_t*)data; int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION) { if (dataVersion >= MQ_DATA_RSP_VERSION) {
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t)); data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
dataLen -= sizeof(int8_t) + sizeof(int32_t); dataLen -= sizeof(int8_t) + sizeof(int32_t);
} }
tDecoderInit(&decoder, data, dataLen); rspObj->resIter = -1;
code = tDecodeSTaosxRsp(&decoder, &rspObj.dataRsp); tDecoderInit(decoder, data, dataLen);
int32_t code = func(decoder, &rspObj->dataRsp);
if (code != 0) { if (code != 0) {
SET_ERROR_MSG("decode mq taosx data rsp failed"); SET_ERROR_MSG("decode mq taosx data rsp failed");
code = TSDB_CODE_INVALID_MSG; }
return code;
}
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
int32_t code = 0;
STableMeta* pTableMeta = NULL;
tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
if (tmpInfo == NULL || retry > 0) {
tbInfo info = {0};
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
if (pCreateReqDst && tmpInfo == NULL) { // change stable name to get meta
tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
}
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
info.uid = pTableMeta->uid;
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
info.suid = pTableMeta->suid;
} else {
info.suid = pTableMeta->uid;
}
code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
if (code != 0) {
taosMemoryFree(pTableMeta);
goto end;
}
if (pCreateReqDst) {
pTableMeta->vgId = info.vgInfo.vgId;
pTableMeta->uid = pCreateReqDst->uid;
pCreateReqDst->ctb.suid = pTableMeta->suid;
}
RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
RAW_RETURN_CHECK(
taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
}
if (pTableMeta == NULL || retry > 0) {
STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
if (code != 0) {
taosMemoryFree(pTableMeta);
goto end; goto end;
} }
if (!pRequest->pDb) { } else {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; pTableMeta = *pTableMetaTmp;
goto end; pTableMeta->uid = tmpInfo->uid;
pTableMeta->vgId = tmpInfo->vgInfo.vgId;
} }
}
*pMeta = pTableMeta;
struct SCatalog* pCatalog = NULL; end:
RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); return code;
}
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
int32_t code = TSDB_CODE_SUCCESS;
SQuery* pQuery = NULL;
SMqRspObj rspObj = {0};
SDecoder decoder = {0};
SRequestObj* pRequest = NULL;
SCatalog* pCatalog = NULL;
SRequestConnInfo conn = {0}; SRequestConnInfo conn = {0};
conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
conn.requestId = pRequest->requestId; uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
conn.requestObjRefId = pRequest->self; RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SHashObj* pVgHash = NULL;
SHashObj* pNameHash = NULL;
SHashObj* pMetaHash = NULL;
RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
int retry = 0;
while (1) {
RAW_RETURN_CHECK(smlInitHandle(&pQuery)); RAW_RETURN_CHECK(smlInitHandle(&pQuery));
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
RAW_NULL_CHECK(pVgHash);
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pCreateTbHash);
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
while (++rspObj.resIter < rspObj.dataRsp.blockNum) { while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
RAW_NULL_CHECK(pRetrieve);
if (!rspObj.dataRsp.withSchema) { if (!rspObj.dataRsp.withSchema) {
goto end; goto end;
} }
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter); const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
if (!tbName) { RAW_NULL_CHECK(tbName);
SET_ERROR_MSG("block tbname is null"); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
code = terrno; RAW_NULL_CHECK(pSW);
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
RAW_NULL_CHECK(pRetrieve);
void* rawData = getRawDataFromRes(pRetrieve);
RAW_NULL_CHECK(rawData);
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
STableMeta* pTableMeta = NULL;
RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
rawData, retry));
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
goto end;
}
}
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
qDestroyQuery(pQuery);
pQuery = NULL;
rspObj.resIter = -1;
continue;
}
break;
}
end:
uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
tDeleteMqDataRsp(&rspObj.dataRsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
return code;
}
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
int32_t code = TSDB_CODE_SUCCESS;
SQuery* pQuery = NULL;
SMqRspObj rspObj = {0};
SDecoder decoder = {0};
SHashObj* pCreateTbHash = NULL;
SRequestObj* pRequest = NULL;
SCatalog* pCatalog = NULL;
SRequestConnInfo conn = {0};
RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pCreateTbHash);
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
SHashObj* pVgHash = NULL;
SHashObj* pNameHash = NULL;
SHashObj* pMetaHash = NULL;
RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
int retry = 0;
while (1) {
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
if (!rspObj.dataRsp.withSchema) {
goto end; goto end;
} }
uDebug(LOG_ID_TAG " write raw metadata block tbname:%s", LOG_ID_VALUE, tbName); const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
RAW_NULL_CHECK(tbName);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
RAW_NULL_CHECK(pSW);
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
RAW_NULL_CHECK(pRetrieve);
void* rawData = getRawDataFromRes(pRetrieve);
RAW_NULL_CHECK(rawData);
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN); tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN); tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
// find schema data info // find schema data info
SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName)); SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
SVgroupInfo vg = {0}; STableMeta* pTableMeta = NULL;
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg)); RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
if (pCreateReqDst) { // change stable name to get meta &pTableMeta, pSW, rawData, retry));
tstrncpy(pName.tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
}
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
if (pCreateReqDst) {
pTableMeta->vgId = vg.vgId;
pTableMeta->uid = pCreateReqDst->uid;
pCreateReqDst->ctb.suid = pTableMeta->suid;
}
void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId));
if (hData == NULL) {
RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
}
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
RAW_NULL_CHECK(pSW);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if (fields == NULL) {
SET_ERROR_MSG("calloc fields failed");
code = terrno;
goto end;
}
for (int i = 0; i < pSW->nCols; i++) {
fields[i].type = pSW->pSchema[i].type;
fields[i].bytes = pSW->pSchema[i].bytes;
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
}
void* rawData = getRawDataFromRes(pRetrieve);
char err[ERR_MSG_LEN] = {0}; char err[ERR_MSG_LEN] = {0};
SVCreateTbReq* pCreateReqTmp = NULL; code =
if (pCreateReqDst) { rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
RAW_RETURN_CHECK(cloneSVreateTbReq(pCreateReqDst, &pCreateReqTmp));
}
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqTmp, fields, pSW->nCols, true, err, ERR_MSG_LEN);
if (pCreateReqTmp != NULL) {
tdDestroySVCreateTbReq(pCreateReqTmp);
taosMemoryFree(pCreateReqTmp);
}
taosMemoryFree(fields);
taosMemoryFreeClear(pTableMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", tbName, err); SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
goto end; goto end;
} }
} }
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
qDestroyQuery(pQuery);
pQuery = NULL;
rspObj.resIter = -1;
continue;
}
break;
}
end: end:
uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code)); uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
tDeleteSTaosxRsp(&rspObj.dataRsp);
void* pIter = taosHashIterate(pCreateTbHash, NULL); void* pIter = taosHashIterate(pCreateTbHash, NULL);
while (pIter) { while (pIter) {
tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE); tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
pIter = taosHashIterate(pCreateTbHash, pIter); pIter = taosHashIterate(pCreateTbHash, pIter);
} }
taosHashCleanup(pCreateTbHash); taosHashCleanup(pCreateTbHash);
tDeleteSTaosxRsp(&rspObj.dataRsp);
tDecoderClear(&decoder); tDecoderClear(&decoder);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
destroyRequest(pRequest); destroyRequest(pRequest);
taosHashCleanup(pVgHash);
taosMemoryFreeClear(pTableMeta);
return code; return code;
} }
@ -2087,7 +2173,7 @@ char* tmq_get_json_meta(TAOS_RES* res) {
processSimpleMeta(&rspObj->metaRsp, &pJson); processSimpleMeta(&rspObj->metaRsp, &pJson);
string = cJSON_PrintUnformatted(pJson); string = cJSON_PrintUnformatted(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
} else{ } else {
uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res); uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
} }
@ -2199,7 +2285,31 @@ void tmq_free_raw(tmq_raw_data raw) {
(void)memset(terrMsg, 0, ERR_MSG_LEN); (void)memset(terrMsg, 0, ERR_MSG_LEN);
} }
static int32_t writeRawInit() {
while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
if (old == 0) {
int32_t code = initRawCacheHash();
if (code != 0) {
uError("tmq writeRawImpl init error:%d", code);
atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
return code;
}
atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
}
}
if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
return TSDB_CODE_INTERNAL_ERROR;
}
return 0;
}
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) { static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
if (writeRawInit() != 0) {
return TSDB_CODE_INTERNAL_ERROR;
}
if (type == TDMT_VND_CREATE_STB) { if (type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, buf, len); return taosCreateStb(taos, buf, len);
} else if (type == TDMT_VND_ALTER_STB) { } else if (type == TDMT_VND_ALTER_STB) {
@ -2214,10 +2324,10 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type)
return taosDropTable(taos, buf, len); return taosDropTable(taos, buf, len);
} else if (type == TDMT_VND_DELETE) { } else if (type == TDMT_VND_DELETE) {
return taosDeleteData(taos, buf, len); return taosDeleteData(taos, buf, len);
} else if (type == RES_TYPE__TMQ) {
return tmqWriteRawDataImpl(taos, buf, len);
} else if (type == RES_TYPE__TMQ_METADATA) { } else if (type == RES_TYPE__TMQ_METADATA) {
return tmqWriteRawMetaDataImpl(taos, buf, len); return tmqWriteRawMetaDataImpl(taos, buf, len);
} else if (type == RES_TYPE__TMQ) {
return tmqWriteRawDataImpl(taos, buf, len);
} else if (type == RES_TYPE__TMQ_BATCH_META) { } else if (type == RES_TYPE__TMQ_BATCH_META) {
return tmqWriteBatchMetaDataImpl(taos, buf, len); return tmqWriteBatchMetaDataImpl(taos, buf, len);
} }
@ -2225,7 +2335,8 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type)
} }
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
if (!taos) { if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }

View File

@ -24,12 +24,9 @@
#include "tref.h" #include "tref.h"
#include "ttimer.h" #include "ttimer.h"
#define tqFatalC(...) do { if (cDebugFlag & DEBUG_FATAL || tqClientDebug) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, tqDebugFlag, __VA_ARGS__); }} while(0) #define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebug) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, tqDebugFlag, __VA_ARGS__); }} while(0) #define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
#define tqWarnC(...) do { if (cDebugFlag & DEBUG_WARN || tqClientDebug) { taosPrintLog("TQ WARN ", DEBUG_WARN, tqDebugFlag, __VA_ARGS__); }} while(0) #define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
#define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebug) { taosPrintLog("TQ ", DEBUG_INFO, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebug) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTraceC(...) do { if (cDebugFlag & DEBUG_TRACE || tqClientDebug) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000
@ -831,8 +828,8 @@ static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
} }
code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet); code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
if (code != 0){ if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d", tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
} }
} }
@ -857,7 +854,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
return; return;
} }
code = innerCommitAll(tmq, pParamSet); code = innerCommitAll(tmq, pParamSet);
if (code != 0){ if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code)); tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
} }
@ -957,7 +954,8 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
} }
tqClientDebug = rsp.debugFlag; tqClientDebugFlag = rsp.debugFlag;
tDestroySMqHbRsp(&rsp); tDestroySMqHbRsp(&rsp);
END: END:
@ -978,6 +976,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch; req.epoch = tmq->epoch;
req.pollFlag = atomic_load_8(&tmq->pollFlag); req.pollFlag = atomic_load_8(&tmq->pollFlag);
tqDebugC("consumer:0x%" PRIx64 " send heartbeat, pollFlag:%d", tmq->consumerId, req.pollFlag);
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
if (req.topics == NULL) { if (req.topics == NULL) {
goto END; goto END;
@ -1063,7 +1062,7 @@ END:
tDestroySMqHbReq(&req); tDestroySMqHbReq(&req);
if (tmrId != NULL) { if (tmrId != NULL) {
bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
tqDebugC("reset timer fo tmq hb:%d", ret); tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag);
} }
int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId); int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
if (ret != 0){ if (ret != 0){
@ -1269,7 +1268,9 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){
tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
}
goto END; goto END;
} }
@ -1422,7 +1423,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
&pTmq->epTimer); &pTmq->epTimer);
tqDebugC("reset timer fo tmq ask ep:%d", ret); tqDebugC("reset timer for tmq ask ep:%d", ret);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn; tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
@ -1430,7 +1431,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
pTmq->autoCommitInterval / 1000.0); pTmq->autoCommitInterval / 1000.0);
bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
&pTmq->commitTimer); &pTmq->commitTimer);
tqDebugC("reset timer fo commit:%d", ret); tqDebugC("reset timer for commit:%d", ret);
} else { } else {
tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
} }

View File

@ -548,7 +548,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebug", tqClientDebug, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebugFlag", tqClientDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "udfDebugFlag", udfDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "udfDebugFlag", udfDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
@ -2000,7 +2000,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"tdbDebugFlag", &tdbDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"tdbDebugFlag", &tdbDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag},
{"smaDebugFlag", &smaDebugFlag}, {"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag},
{"metaDebugFlag", &metaDebugFlag}, {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, {"metaDebugFlag", &metaDebugFlag}, {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag},
{"tqClientDebug", &tqClientDebug}, {"tqClientDebugFlag", &tqClientDebugFlag},
}; };
static OptionNameAndVar options[] = {{"audit", &tsEnableAudit}, static OptionNameAndVar options[] = {{"audit", &tsEnableAudit},

View File

@ -10983,6 +10983,7 @@ _exit:
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
TAOS_CHECK_RETURN(tEncodeMqDataRspCommon(pEncoder, pRsp)); TAOS_CHECK_RETURN(tEncodeMqDataRspCommon(pEncoder, pRsp));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->sleepTime)); TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->sleepTime));
return 0; return 0;
} }
@ -11094,6 +11095,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen)); TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen));
} }
} }
_exit: _exit:
return code; return code;
} }

View File

@ -239,12 +239,13 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer)); MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user)); MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
atomic_store_32(&pConsumer->hbStatus, 0); atomic_store_32(&pConsumer->hbStatus, 0);
mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d %d", consumerId, req.pollFlag, pConsumer->pollStatus);
if (req.pollFlag == 1){ if (req.pollFlag == 1){
atomic_store_32(&pConsumer->pollStatus, 0); atomic_store_32(&pConsumer->pollStatus, 0);
} }
storeOffsetRows(pMnode, &req, pConsumer); storeOffsetRows(pMnode, &req, pConsumer);
rsp.debugFlag = tqClientDebug; rsp.debugFlag = tqClientDebugFlag;
code = buildMqHbRsp(pMsg, &rsp); code = buildMqHbRsp(pMsg, &rsp);
END: END:

View File

@ -243,7 +243,7 @@ int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, con
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, int64_t *createTime);
int32_t tqGetStreamExecInfo(SVnode *pVnode, int64_t streamId, int64_t *pDelay, bool *fhFinished); int32_t tqGetStreamExecInfo(SVnode *pVnode, int64_t streamId, int64_t *pDelay, bool *fhFinished);
// sma // sma

View File

@ -163,7 +163,7 @@ int32_t metaDropTables(SMeta* pMeta, SArray* tbUids);
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount); int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime);
int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);

View File

@ -371,7 +371,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
return 0; return 0;
} }
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) { SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime) {
void *pData = NULL; void *pData = NULL;
int nData = 0; int nData = 0;
int64_t version; int64_t version;
@ -407,6 +407,9 @@ _query:
} }
} else if (me.type == TSDB_CHILD_TABLE) { } else if (me.type == TSDB_CHILD_TABLE) {
uid = me.ctbEntry.suid; uid = me.ctbEntry.suid;
if (createTime != NULL){
*createTime = me.ctbEntry.btime;
}
tDecoderClear(&dc); tDecoderClear(&dc);
goto _query; goto _query;
} else { } else {
@ -617,7 +620,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
STSchema *pTSchema = NULL; STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL; SSchemaWrapper *pSW = NULL;
pSW = metaGetTableSchema(pMeta, uid, sver, lock); pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL);
if (!pSW) return NULL; if (!pSW) return NULL;
pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version); pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version);

View File

@ -552,7 +552,7 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) { void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
bool ret = false; bool ret = false;
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1); SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL);
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) { if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
ret = true; ret = true;
} }

View File

@ -263,7 +263,7 @@ bool tqGetTablePrimaryKey(STqReader* pReader) { return pReader->hasPrimaryKey; }
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) { void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
bool ret = false; bool ret = false;
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1); SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) { if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
ret = true; ret = true;
} }
@ -669,7 +669,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
(pReader->cachedSchemaVer != sversion)) { (pReader->cachedSchemaVer != sversion)) {
tDeleteSchemaWrapper(pReader->pSchemaWrapper); tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
if (pReader->pSchemaWrapper == NULL) { if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
"version %d, possibly dropped table", "version %d, possibly dropped table",
@ -961,10 +961,8 @@ END:
return code; return code;
} }
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
tqDebug("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk); tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
SSDataBlock* block = NULL;
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pSubmitTbData == NULL) { if (pSubmitTbData == NULL) {
return terrno; return terrno;
@ -980,7 +978,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
pReader->lastBlkUid = uid; pReader->lastBlkUid = uid;
tDeleteSchemaWrapper(pReader->pSchemaWrapper); tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime);
if (pReader->pSchemaWrapper == NULL) { if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);

View File

@ -210,13 +210,6 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
if (pDataBlock != NULL && pDataBlock->info.rows > 0) { if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
if (pRsp->withTbName) { if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) {
tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
continue;
}
} else {
char* tbName = taosStrdup(qExtractTbnameFromTask(task)); char* tbName = taosStrdup(qExtractTbnameFromTask(task));
if (tbName == NULL) { if (tbName == NULL) {
tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId); tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
@ -227,21 +220,13 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
continue; continue;
} }
} }
}
if (pRsp->withSchema) { if (pRsp->withSchema) {
if (pOffset->type == TMQ_OFFSET__LOG) {
if (tqAddBlockSchemaToRsp(pExec, pRsp) != 0){
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
continue;
}
} else {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){ if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
continue; continue;
} }
} }
}
if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock), if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision) != 0) { pTq->pVnode->config.tsdbCfg.precision) != 0) {
@ -249,12 +234,9 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
continue; continue;
} }
pRsp->blockNum++; pRsp->blockNum++;
if (pOffset->type == TMQ_OFFSET__LOG) {
continue;
} else {
rowCnt += pDataBlock->info.rows; rowCnt += pDataBlock->info.rows;
if (rowCnt <= tmqRowSize) continue; if (rowCnt <= tmqRowSize) continue;
}
} }
// get meta // get meta
@ -296,6 +278,54 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
return code; return code;
} }
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
int32_t code = 0;
void* createReq = NULL;
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
if (pRsp->createTableLen == NULL) {
code = terrno;
goto END;
}
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
if (pRsp->createTableReq == NULL) {
code = terrno;
goto END;
}
}
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
goto END;
}
createReq = taosMemoryCalloc(1, len);
if (createReq == NULL){
code = terrno;
goto END;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
tEncoderClear(&encoder);
if (code < 0) {
goto END;
}
if (taosArrayPush(pRsp->createTableLen, &len) == NULL){
code = terrno;
goto END;
}
if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){
code = terrno;
goto END;
}
pRsp->createTableNum++;
return 0;
END:
taosMemoryFree(createReq);
return code;
}
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
int32_t code = 0; int32_t code = 0;
@ -315,7 +345,8 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
} }
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbDataRet = NULL;
code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet); int64_t createTime = INT64_MAX;
code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
if (code != 0) { if (code != 0) {
tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId); tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId);
goto END; goto END;
@ -333,46 +364,13 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
} }
} }
if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) { if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) { if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) { // judge if table is already created to avoid sending crateTbReq
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
if (pRsp->createTableLen == NULL) { if (code != 0){
code = terrno; tqError("vgId:%d, failed to build create table info", pTq->pVnode->config.vgId);
goto END;
}
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
if (pRsp->createTableReq == NULL) {
code = terrno;
goto END; goto END;
} }
} }
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
goto END;
}
void* createReq = taosMemoryCalloc(1, len);
if (createReq == NULL){
code = terrno;
goto END;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
tEncoderClear(&encoder);
if (code < 0) {
taosMemoryFree(createReq);
goto END;
}
if (taosArrayPush(pRsp->createTableLen, &len) == NULL){
taosMemoryFree(createReq);
goto END;
}
if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){
taosMemoryFree(createReq);
goto END;
}
pRsp->createTableNum++;
} }
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) { if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) {
goto END; goto END;

View File

@ -51,7 +51,8 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
pRsp->blockTbName = taosArrayInit(0, sizeof(void*)); pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) { if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL ||
pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
if (pRsp->blockData != NULL) { if (pRsp->blockData != NULL) {
taosArrayDestroy(pRsp->blockData); taosArrayDestroy(pRsp->blockData);
pRsp->blockData = NULL; pRsp->blockData = NULL;
@ -71,6 +72,7 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
taosArrayDestroy(pRsp->blockSchema); taosArrayDestroy(pRsp->blockSchema);
pRsp->blockSchema = NULL; pRsp->blockSchema = NULL;
} }
return terrno; return terrno;
} }

View File

@ -702,7 +702,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
} }
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) { int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0); SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL);
if (pSW) { if (pSW) {
*num = pSW->nCols; *num = pSW->nCols;
tDeleteSchemaWrapper(pSW); tDeleteSchemaWrapper(pSW);

View File

@ -1495,6 +1495,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN); tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
// pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema); tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema; pTaskInfo->streamInfo.schema = mtInfo.schema;

View File

@ -886,17 +886,32 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
return false; return false;
} }
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* tFields, int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen) { int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
int ret = 0;
if(data == NULL) { if(data == NULL) {
uError("rawBlockBindData, data is NULL"); uError("rawBlockBindData, data is NULL");
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
void* tmp = void* tmp =
taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid)); taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
SVCreateTbReq *pCreateReqTmp = NULL;
if (tmp == NULL && pCreateTb != NULL){
ret = cloneSVreateTbReq(pCreateTb, &pCreateReqTmp);
if (ret != TSDB_CODE_SUCCESS){
uError("cloneSVreateTbReq error");
goto end;
}
}
STableDataCxt* pTableCxt = NULL; STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, pCreateTb, &pTableCxt, true, false); sizeof(pTableMeta->uid), pTableMeta, &pCreateReqTmp, &pTableCxt, true, false);
if (pCreateReqTmp != NULL) {
tdDestroySVCreateTbReq(pCreateReqTmp);
taosMemoryFree(pCreateReqTmp);
}
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("insGetTableDataCxt error"); uError("insGetTableDataCxt error");
goto end; goto end;
@ -948,12 +963,17 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
ret = TSDB_CODE_INVALID_PARA; ret = TSDB_CODE_INVALID_PARA;
goto end; goto end;
} }
if (tFields != NULL && numFields > boundInfo->numOfBound) { // if (tFields != NULL && numFields > boundInfo->numOfBound) {
if (errstr != NULL) // if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound);
snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound); // ret = TSDB_CODE_INVALID_PARA;
// goto end;
// }
if (tFields == NULL && numOfCols != boundInfo->numOfBound) {
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to num of bound cols:%d", numOfCols, boundInfo->numOfBound);
ret = TSDB_CODE_INVALID_PARA; ret = TSDB_CODE_INVALID_PARA;
goto end; goto end;
} }
if (tFields == NULL) { if (tFields == NULL) {
for (int j = 0; j < boundInfo->numOfBound; j++) { for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j]; SSchema* pColSchema = &pSchema[j];
@ -991,7 +1011,13 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
for (int i = 0; i < numFields; i++) { for (int i = 0; i < numFields; i++) {
for (int j = 0; j < boundInfo->numOfBound; j++) { for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j]; SSchema* pColSchema = &pSchema[j];
if (strcmp(pColSchema->name, tFields[i].name) == 0) { char* fieldName = NULL;
if (raw) {
fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
} else {
fieldName = ((TAOS_FIELD*)tFields)[i].name;
}
if (strcmp(pColSchema->name, fieldName) == 0) {
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (errstr != NULL) if (errstr != NULL)
snprintf(errstr, errstrLen, snprintf(errstr, errstrLen,
@ -1011,6 +1037,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
pStart += numOfRows * sizeof(int32_t); pStart += numOfRows * sizeof(int32_t);
} else { } else {
pStart += BitmapLen(numOfRows); pStart += BitmapLen(numOfRows);
// for(int k = 0; k < numOfRows; k++) {
// if(!colDataIsNull_f(offset, k) && pColSchema->type == TSDB_DATA_TYPE_INT){
// printf("colName:%s,val:%d", fieldName, *(int32_t*)(pStart + k * sizeof(int32_t)));
// }
// }
} }
char* pData = pStart; char* pData = pStart;

View File

@ -128,7 +128,7 @@ int32_t idxDebugFlag = 131;
int32_t sndDebugFlag = 131; int32_t sndDebugFlag = 131;
int32_t simDebugFlag = 131; int32_t simDebugFlag = 131;
int32_t tqClientDebug = 0; int32_t tqClientDebugFlag = 131;
int64_t dbgEmptyW = 0; int64_t dbgEmptyW = 0;
int64_t dbgWN = 0; int64_t dbgWN = 0;

View File

@ -57,7 +57,7 @@ class TDTestCase:
tdSql.checkData(0, 2, 0) tdSql.checkData(0, 2, 0)
tdSql.query("show dnode 1 variables like '%debugFlag'") tdSql.query("show dnode 1 variables like '%debugFlag'")
tdSql.checkRows(24) tdSql.checkRows(25)
tdSql.query("show dnode 1 variables like '____debugFlag'") tdSql.query("show dnode 1 variables like '____debugFlag'")
tdSql.checkRows(2) tdSql.checkRows(2)

View File

@ -131,14 +131,14 @@ class TDTestCase:
tdSql.checkData(0, 2, 1) tdSql.checkData(0, 2, 1)
tdSql.query("select * from ct3 order by c1 desc") tdSql.query("select * from ct3 order by c1 desc")
tdSql.checkRows(2) tdSql.checkRows(5)
tdSql.checkData(0, 1, 51) tdSql.checkData(0, 1, 51)
tdSql.checkData(0, 4, 940) tdSql.checkData(0, 4, 940)
tdSql.checkData(1, 1, 23) tdSql.checkData(1, 1, 23)
tdSql.checkData(1, 4, None) tdSql.checkData(1, 4, None)
tdSql.query("select * from st1 order by ts") tdSql.query("select * from st1 order by ts")
tdSql.checkRows(8) tdSql.checkRows(14)
tdSql.checkData(0, 1, 1) tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 3) tdSql.checkData(1, 1, 3)
tdSql.checkData(4, 1, 4) tdSql.checkData(4, 1, 4)
@ -180,7 +180,7 @@ class TDTestCase:
tdSql.checkData(6, 8, None) tdSql.checkData(6, 8, None)
tdSql.query("select * from ct1") tdSql.query("select * from ct1")
tdSql.checkRows(4) tdSql.checkRows(7)
tdSql.query("select * from ct2") tdSql.query("select * from ct2")
tdSql.checkRows(0) tdSql.checkRows(0)

View File

@ -79,6 +79,7 @@ static void msg_process(TAOS_RES* msg) {
} else { } else {
taosFprintfFile(g_fp, result); taosFprintfFile(g_fp, result);
taosFprintfFile(g_fp, "\n"); taosFprintfFile(g_fp, "\n");
taosFsyncFile(g_fp);
} }
} }
} }
@ -132,7 +133,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)"); pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table ct0, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
@ -175,7 +176,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
pRes = taos_query( pRes = taos_query(
pConn, pConn,
"insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, " "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, "
"'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); "'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(1626006833703, 23, 32, 's21ds')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
@ -189,6 +190,41 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 values(1736006813600, -32222, 43, 'ewb', 99)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 drop column c4");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 values(1736006833600, -4223, 344, 'bfs')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 values(1766006833600, -4432, 4433, 'e23wb', 9349)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)"); pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
@ -596,6 +632,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1"); tmq_conf_set(conf, "msg.consume.excluded", "1");
// tmq_conf_set(conf, "session.timeout.ms", "1000000");
// tmq_conf_set(conf, "max.poll.interval.ms", "20000"); // tmq_conf_set(conf, "max.poll.interval.ms", "20000");
if (g_conf.snapShot) { if (g_conf.snapShot) {
@ -636,6 +673,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000);
if (tmqmessage) { if (tmqmessage) {
cnt++; cnt++;
printf("cnt:%d\n", cnt);
msg_process(tmqmessage); msg_process(tmqmessage);
taos_free_result(tmqmessage); taos_free_result(tmqmessage);
} else { } else {
@ -844,6 +882,8 @@ void initLogFile() {
"{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}", "{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\"," "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\","
"\"colType\":5}", "\"colType\":5}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":6,\"colName\":\"c4\"}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c3\"," "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c3\","
"\"colType\":8,\"colLength\":64}", "\"colType\":8,\"colLength\":64}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t2\"," "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t2\","
@ -991,6 +1031,8 @@ void initLogFile() {
"{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}", "{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\"," "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\","
"\"colType\":5}", "\"colType\":5}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":6,\"colName\":\"c4\"}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c3\"," "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c3\","
"\"colType\":8,\"colLength\":64}", "\"colType\":8,\"colLength\":64}",
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t2\"," "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t2\","