Merge pull request #29666 from taosdata/feat/TS-5776

fix:[TS-5776]add raw type from consumer
This commit is contained in:
Shengliang Guan 2025-02-18 17:53:52 +08:00 committed by GitHub
commit 8bee0e7030
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
41 changed files with 1376 additions and 260 deletions

View File

@ -360,7 +360,8 @@ typedef enum tmq_res_t {
TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2,
TMQ_RES_METADATA = 3
TMQ_RES_METADATA = 3,
TMQ_RES_RAWDATA = 4
} tmq_res_t;
typedef struct tmq_topic_assignment {

View File

@ -121,10 +121,11 @@ enum {
TMQ_MSG_TYPE__POLL_DATA_META_RSP,
TMQ_MSG_TYPE__WALINFO_RSP,
TMQ_MSG_TYPE__POLL_BATCH_META_RSP,
TMQ_MSG_TYPE__POLL_RAW_DATA_RSP,
};
static char* tmqMsgTypeStr[] = {
"data", "meta", "ask ep", "meta data", "wal info", "batch meta"
"data", "meta", "ask ep", "meta data", "wal info", "batch meta", "raw data"
};
enum {

View File

@ -2312,6 +2312,10 @@ typedef struct SSysTableSchema {
int32_t tSerializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);
int32_t tDeserializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);
#define RETRIEVE_TABLE_RSP_VERSION 0
#define RETRIEVE_TABLE_RSP_TMQ_VERSION 1
#define RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION 2
typedef struct {
int64_t useconds;
int8_t completed; // all results are returned to client
@ -4200,7 +4204,9 @@ typedef struct {
STqOffsetVal reqOffset;
int8_t enableReplay;
int8_t sourceExcluded;
int8_t rawData;
int8_t enableBatchMeta;
SHashObj *uidHash; // to find if uid is duplicated
} SMqPollReq;
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
@ -4274,13 +4280,21 @@ typedef struct {
SArray* createTableLen;
SArray* createTableReq;
};
struct{
int32_t len;
void* rawData;
};
};
void* data; //for free in client, only effected if type is data or metadata. raw data not effected
bool blockDataElementFree; // if true, free blockDataElement in blockData,(true in server, false in client)
} SMqDataRsp;
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pObj);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
int32_t tDecodeMqRawDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
void tDeleteMqDataRsp(SMqDataRsp* pRsp);
void tDeleteMqRawDataRsp(SMqDataRsp* pRsp);
int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
@ -4530,6 +4544,7 @@ typedef struct {
typedef struct {
SArray* aSubmitTbData; // SArray<SSubmitTbData>
bool raw;
} SSubmitReq2;
typedef struct {
@ -4538,8 +4553,9 @@ typedef struct {
char data[]; // SSubmitReq2
} SSubmitReq2Msg;
int32_t transformRawSSubmitTbData(void* data, int64_t suid, int64_t uid, int32_t sver);
int32_t tEncodeSubmitReq(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSubmitReq(SDecoder* pCoder, SSubmitReq2* pReq);
int32_t tDecodeSubmitReq(SDecoder* pCoder, SSubmitReq2* pReq, SArray* rawList);
void tDestroySubmitTbData(SSubmitTbData* pTbData, int32_t flag);
void tDestroySubmitReq(SSubmitReq2* pReq, int32_t flag);

View File

@ -180,8 +180,11 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
char* msgBuf, int32_t msgBufLen, void* charsetCxt);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
int32_t smlBuildOutputRaw(SQuery* handle, SHashObj* pVgHash);
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data);
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* fields,
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw);
int32_t checkSchema(SSchema* pColSchema, int8_t* fields, char* errstr, int32_t errstrLen);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut);

View File

@ -1016,6 +1016,8 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016)
#define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017)
#define TSDB_CODE_TMQ_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x4018)
#define TSDB_CODE_TMQ_RAW_DATA_SPLIT TAOS_DEF_ERROR_CODE(0, 0x4019)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)

View File

@ -45,6 +45,7 @@ enum {
RES_TYPE__TMQ_META,
RES_TYPE__TMQ_METADATA,
RES_TYPE__TMQ_BATCH_META,
RES_TYPE__TMQ_RAWDATA,
};
#define SHOW_VARIABLES_RESULT_COLS 5
@ -55,6 +56,7 @@ enum {
#define SHOW_VARIABLES_RESULT_FIELD5_LEN (TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE)
#define TD_RES_QUERY(res) (*(int8_t*)(res) == RES_TYPE__QUERY)
#define TD_RES_TMQ_RAW(res) (*(int8_t*)(res) == RES_TYPE__TMQ_RAWDATA)
#define TD_RES_TMQ(res) (*(int8_t*)(res) == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_META)
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)(res) == RES_TYPE__TMQ_METADATA)

View File

@ -502,7 +502,7 @@ void taos_close(TAOS *taos) {
}
int taos_errno(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return terrno;
}
@ -514,7 +514,7 @@ int taos_errno(TAOS_RES *res) {
}
const char *taos_errstr(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return (const char *)tstrerror(terrno);
}
@ -554,6 +554,8 @@ void taos_free_result(TAOS_RES *res) {
tDeleteMqMetaRsp(&pRsp->metaRsp);
} else if (TD_RES_TMQ_BATCH_META(res)) {
tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp);
} else if (TD_RES_TMQ_RAW(res)) {
tDeleteMqRawDataRsp(&pRsp->dataRsp);
}
taosMemoryFree(pRsp);
}
@ -572,7 +574,7 @@ void taos_kill_query(TAOS *taos) {
}
int taos_field_count(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0;
}
@ -583,7 +585,7 @@ int taos_field_count(TAOS_RES *res) {
int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); }
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (taos_num_fields(res) == 0 || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return NULL;
}
@ -643,7 +645,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL;
} else {
tscError("invalid result passed to taos_fetch_row");
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
terrno = TSDB_CODE_TMQ_INVALID_DATA;
return NULL;
}
}
@ -764,7 +766,7 @@ int taos_print_row_with_size(char *str, uint32_t size, TAOS_ROW row, TAOS_FIELD
}
int *taos_fetch_lengths(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return NULL;
}
@ -773,7 +775,7 @@ int *taos_fetch_lengths(TAOS_RES *res) {
}
TAOS_ROW *taos_result_block(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
@ -841,7 +843,7 @@ const char *taos_get_client_info() { return td_version; }
// return int32_t
int taos_affected_rows(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) {
return 0;
}
@ -853,7 +855,7 @@ int taos_affected_rows(TAOS_RES *res) {
// return int64_t
int64_t taos_affected_rows64(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) {
return 0;
}
@ -864,7 +866,7 @@ int64_t taos_affected_rows64(TAOS_RES *res) {
}
int taos_result_precision(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return TSDB_TIME_PRECISION_MILLI;
}
@ -904,7 +906,7 @@ int taos_select_db(TAOS *taos, const char *db) {
}
void taos_stop_query(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) {
return;
}
@ -913,7 +915,7 @@ void taos_stop_query(TAOS_RES *res) {
}
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return true;
}
SReqResultInfo *pResultInfo = tscGetCurResInfo(res);
@ -938,7 +940,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
}
int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0;
}
@ -973,7 +975,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
return 0;
} else {
tscError("taos_fetch_block_s invalid res type");
return -1;
return TSDB_CODE_TMQ_INVALID_DATA;
}
}
@ -981,7 +983,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
*numOfRows = 0;
*pData = NULL;
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0;
}
@ -1018,7 +1020,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
}
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0;
}
@ -1038,7 +1040,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows) {
if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || columnIndex < 0 || TD_RES_TMQ_META(res) ||
TD_RES_TMQ_BATCH_META(res)) {
TD_RES_TMQ_RAW(res) || TD_RES_TMQ_BATCH_META(res)) {
return TSDB_CODE_INVALID_PARA;
}

View File

@ -1844,9 +1844,10 @@ static void* getRawDataFromRes(void* pRetrieve) {
}
void* rawData = NULL;
// deal with compatibility
if (*(int64_t*)pRetrieve == 0) {
if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
} else if (*(int64_t*)pRetrieve == 1) {
} else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
}
return rawData;
@ -1947,7 +1948,10 @@ static int32_t initRawCacheHash() {
}
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
if (rawData == NULL || pTableMeta == NULL || pSW == NULL) {
if (rawData == NULL || pSW == NULL){
return false;
}
if (pTableMeta == NULL) {
uError("invalid parameter in %s", __func__);
return false;
}
@ -1965,6 +1969,7 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe
if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
return true;
}
for (int i = 0; i < pSW->nCols; i++) {
int j = 0;
for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
@ -1972,7 +1977,7 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe
char* fieldName = pSW->pSchema[i].name;
if (strcmp(pColSchema->name, fieldName) == 0) {
if (*fields != pColSchema->type || *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
if (checkSchema(pColSchema, fields, NULL, 0) != 0){
return true;
}
break;
@ -2069,7 +2074,7 @@ static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj
SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
pMeta == NULL || pSW == NULL || rawData == NULL) {
pMeta == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
@ -2168,7 +2173,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
void* rawData = getRawDataFromRes(pRetrieve);
RAW_NULL_CHECK(rawData);
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
@ -2251,7 +2256,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen)
void* rawData = getRawDataFromRes(pRetrieve);
RAW_NULL_CHECK(rawData);
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
@ -2298,6 +2303,90 @@ end:
return code;
}
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
if (taos == NULL || data == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
SQuery* pQuery = NULL;
SHashObj* pVgroupHash = NULL;
SMqRspObj rspObj = {0};
SDecoder decoder = {0};
SRequestObj* pRequest = NULL;
SCatalog* pCatalog = NULL;
SRequestConnInfo conn = {0};
RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
SHashObj* pVgHash = NULL;
SHashObj* pNameHash = NULL;
SHashObj* pMetaHash = NULL;
RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
int retry = 0;
while (1) {
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pVgroupHash);
pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
RAW_NULL_CHECK(pStmt->pVgDataBlocks);
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
RAW_NULL_CHECK(tbName);
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
RAW_NULL_CHECK(pRetrieve);
void* rawData = getRawDataFromRes(pRetrieve);
RAW_NULL_CHECK(rawData);
uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
// find schema data info
STableMeta* pTableMeta = NULL;
RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName,
&pTableMeta, NULL, NULL, retry));
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
goto end;
}
}
taosHashCleanup(pVgroupHash);
pVgroupHash = NULL;
RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
qDestroyQuery(pQuery);
pQuery = NULL;
rspObj.resIter = -1;
continue;
}
break;
}
end:
uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
tDeleteMqDataRsp(&rspObj.dataRsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
taosHashCleanup(pVgroupHash);
destroyRequest(pRequest);
return code;
}
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
if (pMetaRsp == NULL || meta == NULL) {
uError("invalid parameter in %s", __func__);
@ -2472,6 +2561,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
*raw = (tmq_raw_data){0};
SMqRspObj* rspObj = ((SMqRspObj*)res);
if (TD_RES_TMQ_META(res)) {
raw->raw = rspObj->metaRsp.metaRsp;
@ -2499,6 +2589,12 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
raw->raw_type = rspObj->resType;
uDebug("tmq get raw batch meta:%p", raw);
} else if (TD_RES_TMQ_RAW(res)) {
raw->raw = rspObj->dataRsp.rawData;
rspObj->dataRsp.rawData = NULL;
raw->raw_len = rspObj->dataRsp.len;
raw->raw_type = rspObj->resType;
uDebug("tmq get raw raw:%p", raw);
} else {
uError("tmq get raw error type:%d", *(int8_t*)res);
return TSDB_CODE_TMQ_INVALID_MSG;
@ -2508,8 +2604,11 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
void tmq_free_raw(tmq_raw_data raw) {
uDebug("tmq free raw data type:%d", raw.raw_type);
if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
if (raw.raw_type == RES_TYPE__TMQ ||
raw.raw_type == RES_TYPE__TMQ_METADATA) {
taosMemoryFree(raw.raw);
} else if(raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL){
taosMemoryFree(POINTER_SHIFT(raw.raw, - sizeof(SMqRspHead)));
}
(void)memset(terrMsg, 0, ERR_MSG_LEN);
}
@ -2559,6 +2658,8 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type)
return taosDeleteData(taos, buf, len);
} else if (type == RES_TYPE__TMQ_METADATA) {
return tmqWriteRawMetaDataImpl(taos, buf, len);
} else if (type == RES_TYPE__TMQ_RAWDATA) {
return tmqWriteRawRawDataImpl(taos, buf, len);
} else if (type == RES_TYPE__TMQ) {
return tmqWriteRawDataImpl(taos, buf, len);
} else if (type == RES_TYPE__TMQ_BATCH_META) {

View File

@ -42,18 +42,19 @@
#define PROCESS_POLL_RSP(FUNC,DATA) \
SDecoder decoder = {0}; \
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); \
tDecoderInit(&decoder, POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead)), pRspWrapper->pollRsp.len - sizeof(SMqRspHead)); \
if (FUNC(&decoder, DATA) < 0) { \
tDecoderClear(&decoder); \
code = terrno; \
goto END;\
}\
tDecoderClear(&decoder);\
(void)memcpy(DATA, pMsg->pData, sizeof(SMqRspHead));
(void)memcpy(DATA, pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
#define DELETE_POLL_RSP(FUNC,DATA) \
SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
taosMemoryFreeClear(pRsp->pEpset);\
taosMemoryFreeClear(pRsp->pEpset); \
taosMemoryFreeClear(pRsp->data); \
FUNC(DATA);
enum {
@ -92,6 +93,7 @@ struct tmq_conf_t {
int8_t snapEnable;
int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit
int8_t rawData; // fetch raw data
uint16_t port;
int32_t autoCommitInterval;
int32_t sessionTimeoutMs;
@ -121,6 +123,7 @@ struct tmq_t {
int8_t resetOffsetCfg;
int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit
int8_t rawData; // fetch raw data
int64_t consumerId;
tmq_commit_cb* commitCb;
void* commitCbUserParam;
@ -188,6 +191,8 @@ typedef struct {
SMqClientTopic* topicHandle;
uint64_t reqId;
SEpSet* pEpset;
void* data;
uint32_t len;
union {
struct{
SMqRspHead head;
@ -491,11 +496,17 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if (strcasecmp(key, "msg.consume.excluded") == 0) {
int64_t tmp;
int64_t tmp = 0;
code = taosStr2int64(value, &tmp);
conf->sourceExcluded = (0 == code && tmp != 0) ? TD_REQ_FROM_TAOX : 0;
return TMQ_CONF_OK;
}
if (strcasecmp(key, "msg.consume.rawdata") == 0) {
int64_t tmp = 0;
code = taosStr2int64(value, &tmp);
conf->rawData = (0 == code && tmp != 0) ? 1 : 0;
return TMQ_CONF_OK;
}
if (strcasecmp(key, "td.connect.db") == 0) {
return TMQ_CONF_OK;
@ -812,7 +823,7 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c
goto end;
}
if (TD_RES_TMQ(pRes) || TD_RES_TMQ_META(pRes) ||
if (TD_RES_TMQ(pRes) || TD_RES_TMQ_RAW(pRes) || TD_RES_TMQ_META(pRes) ||
TD_RES_TMQ_METADATA(pRes) || TD_RES_TMQ_BATCH_META(pRes)) {
SMqRspObj* pRspObj = (SMqRspObj*)pRes;
pTopicName = pRspObj->topic;
@ -1082,6 +1093,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
sendInfo->fp = tmqHbCb;
sendInfo->msgType = TDMT_MND_TMQ_HB;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
@ -1122,6 +1134,8 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
DELETE_POLL_RSP(tDeleteMqBatchMetaRsp,&pRsp->batchMetaRsp)
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
DELETE_POLL_RSP(tDeleteMqRawDataRsp, &pRsp->dataRsp)
}
}
@ -1733,6 +1747,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->replayEnable = conf->replayEnable;
pTmq->sourceExcluded = conf->sourceExcluded;
pTmq->rawData = conf->rawData;
pTmq->enableBatchMeta = conf->enableBatchMeta;
tstrncpy(pTmq->user, user, TSDB_USER_LEN);
if (taosGetFqdn(pTmq->fqdn) != 0) {
@ -2057,23 +2072,14 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
goto END;
}
rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d,QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, requestId);
if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
} else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
} else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
} else { // invalid rspType
tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto END;
}
tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s),QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId);
pRspWrapper->tmqRspType = rspType;
pRspWrapper->pollRsp.reqId = requestId;
pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
pRspWrapper->pollRsp.data = pMsg->pData;
pRspWrapper->pollRsp.len = pMsg->len;
pMsg->pData = NULL;
pMsg->pEpSet = NULL;
END:
@ -2087,8 +2093,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosFreeQitem(pRspWrapper);
tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
} else {
tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64,
tmq ? tmq->consumerId : 0, rspType, vgId, taosQueueItemSize(tmq->mqueue), requestId);
tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d(%s), vgId:%d, total in queue:%d,QID:0x%" PRIx64,
tmq ? tmq->consumerId : 0, rspType, tmqMsgTypeStr[rspType], vgId, taosQueueItemSize(tmq->mqueue), requestId);
}
}
@ -2122,6 +2128,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq->reqId = generateRequestId();
pReq->enableReplay = tmq->replayEnable;
pReq->sourceExcluded = tmq->sourceExcluded;
pReq->rawData = tmq->rawData;
pReq->enableBatchMeta = tmq->enableBatchMeta;
}
@ -2415,6 +2422,35 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
return code;
}
static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){
int32_t code = 0;
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
pRspWrapper->pollRsp.data = NULL;
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data;
pRspWrapper->pollRsp.data = NULL;
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead);
pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
pRspWrapper->pollRsp.data = NULL;
} else {
tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto END;
}
END:
return code;
}
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
int32_t code = 0;
SMqRspObj* pRspObj = NULL;
@ -2427,6 +2463,10 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
return pRspObj;
}
code = processWrapperData(pRspWrapper);
if (code != 0) {
goto END;
}
SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
@ -2442,7 +2482,9 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
pVg->epSet = *pollRspWrapper->pEpset;
}
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ||
pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP ||
pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
updateVgInfo(pVg, &pollRspWrapper->dataRsp.reqOffset, &pollRspWrapper->dataRsp.rspOffset, pollRspWrapper->head.walsver, pollRspWrapper->head.walever,
tmq->consumerId, pollRspWrapper->dataRsp.blockNum != 0);
@ -2459,12 +2501,15 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
tqErrorC("consumer:0x%" PRIx64 " failed to allocate memory for meta rsp", tmq->consumerId);
goto END;
}
pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA;
pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP ? RES_TYPE__TMQ_RAWDATA :
(pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA);
int64_t numOfRows = 0;
tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
tmq->totalRows += numOfRows;
if (pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
tmq->totalRows += numOfRows;
}
pVg->emptyBlockReceiveTs = 0;
if (tmq->replayEnable) {
if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
pVg->blockReceiveTs = taosGetTimestampMs();
pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
if (pVg->blockSleepForReplay > 0) {
@ -2523,6 +2568,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq) {
returnVal = processMqRsp(tmq, pRspWrapper);
code = terrno;
}
tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper);
if(returnVal != NULL || code != 0){
@ -2575,7 +2621,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
END:
terrno = code;
if (tmq != NULL) {
if (tmq != NULL && terrno != 0) {
tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno));
}
return NULL;
@ -2688,6 +2734,8 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
return TMQ_RES_METADATA;
} else if (TD_RES_TMQ_BATCH_META(res)) {
return TMQ_RES_TABLE_META;
} else if (TD_RES_TMQ_RAW(res)) {
return TMQ_RES_RAWDATA;
} else {
return TMQ_RES_INVALID;
}
@ -2697,7 +2745,7 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
if (res == NULL) {
return NULL;
}
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
char* tmp = strchr(((SMqRspObj*)res)->topic, '.');
if (tmp == NULL) {
@ -2714,7 +2762,7 @@ const char* tmq_get_db_name(TAOS_RES* res) {
return NULL;
}
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
char* tmp = strchr(((SMqRspObj*)res)->db, '.');
if (tmp == NULL) {
@ -2730,7 +2778,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (res == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res) || TD_RES_TMQ_META(res)) {
return ((SMqRspObj*)res)->vgId;
} else {
@ -2742,7 +2790,7 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
if (res == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
if (TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
STqOffsetVal* pOffset = &pRspObj->dataRsp.reqOffset;
if (pOffset->type == TMQ_OFFSET__LOG) {
@ -2767,7 +2815,7 @@ const char* tmq_get_table_name(TAOS_RES* res) {
if (res == NULL) {
return NULL;
}
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) ) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
SMqDataRsp* data = &pRspObj->dataRsp;
if (!data->withTbName || data->blockTbName == NULL || pRspObj->resIter < 0 ||
@ -3670,4 +3718,4 @@ TAOS* tmq_get_connect(tmq_t* tmq) {
return (TAOS*)(&(tmq->pTscObj->id));
}
return NULL;
}
}

View File

@ -9219,6 +9219,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->enableReplay));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->sourceExcluded));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->enableBatchMeta));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->rawData));
tEndEncode(&encoder);
@ -9272,6 +9273,10 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
pReq->enableBatchMeta = false;
}
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->rawData));
}
tEndDecode(&decoder);
_exit:
@ -9279,7 +9284,13 @@ _exit:
return code;
}
void tDestroySMqPollReq(SMqPollReq *pReq) { tOffsetDestroy(&pReq->reqOffset); }
void tDestroySMqPollReq(SMqPollReq *pReq) {
tOffsetDestroy(&pReq->reqOffset);
if (pReq->uidHash != NULL) {
taosHashCleanup(pReq->uidHash);
pReq->uidHash = NULL;
}
}
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
int32_t code = 0;
int32_t lino;
@ -11448,12 +11459,14 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
}
for (int32_t i = 0; i < pRsp->blockNum; i++) {
void *data;
uint64_t bLen;
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &bLen));
void *data = NULL;
uint32_t bLen = 0;
TAOS_CHECK_EXIT(tDecodeBinary(pDecoder, (uint8_t**)&data, &bLen));
if (taosArrayPush(pRsp->blockData, &data) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
pRsp->blockDataElementFree = false;
int32_t len = bLen;
if (taosArrayPush(pRsp->blockDataLen, &len) == NULL) {
TAOS_CHECK_EXIT(terrno);
@ -11499,10 +11512,25 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
return 0;
}
int32_t tDecodeMqRawDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset));
TAOS_CHECK_EXIT(tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->blockNum));
_exit:
return code;
}
static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
taosArrayDestroyP(pRsp->blockData, NULL);
if (pRsp->blockDataElementFree){
taosArrayDestroyP(pRsp->blockData, NULL);
} else {
taosArrayDestroy(pRsp->blockData);
}
pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
pRsp->blockSchema = NULL;
@ -11510,6 +11538,7 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
pRsp->blockTbName = NULL;
tOffsetDestroy(&pRsp->reqOffset);
tOffsetDestroy(&pRsp->rspOffset);
taosMemoryFreeClear(pRsp->data);
}
void tDeleteMqDataRsp(SMqDataRsp *rsp) { tDeleteMqDataRspCommon(rsp); }
@ -11572,6 +11601,14 @@ void tDeleteSTaosxRsp(SMqDataRsp *pRsp) {
pRsp->createTableReq = NULL;
}
void tDeleteMqRawDataRsp(SMqDataRsp *pRsp) {
tOffsetDestroy(&pRsp->reqOffset);
tOffsetDestroy(&pRsp->rspOffset);
if (pRsp->rawData != NULL){
taosMemoryFree(POINTER_SHIFT(pRsp->rawData, - sizeof(SMqRspHead)));
}
}
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pReq->tbname));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->startTs));
@ -11658,6 +11695,26 @@ int32_t tDecodeSBatchDeleteReqSetCtime(SDecoder *pDecoder, SBatchDeleteReq *pReq
_exit:
return code;
}
int32_t transformRawSSubmitTbData(void* data, int64_t suid, int64_t uid, int32_t sver){
int32_t code = 0;
int32_t lino = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t *)POINTER_SHIFT(data, INT_BYTES), *(uint32_t*)data);
int32_t flags = 0;
TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &flags));
flags |= TD_REQ_FROM_TAOX;
flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
SEncoder encoder = {0};
tEncoderInit(&encoder, (uint8_t *)POINTER_SHIFT(data, INT_BYTES), *(uint32_t*)data);
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, flags));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, suid));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, uid));
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, sver));
_exit:
return code;
}
static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) {
int32_t code = 0;
@ -11705,14 +11762,20 @@ _exit:
return code;
}
static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData) {
static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData, void* rawData) {
int32_t code = 0;
int32_t lino;
int32_t flags;
uint8_t version;
uint8_t* dataAfterCreate = NULL;
uint8_t* dataStart = pCoder->data + pCoder->pos;
uint32_t posAfterCreate = 0;
TAOS_CHECK_EXIT(tStartDecode(pCoder));
uint32_t pos = pCoder->pos;
TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &flags));
uint32_t flagsLen = pCoder->pos - pos;
pSubmitTbData->flags = flags & 0xff;
version = (flags >> 8) & 0xff;
@ -11724,6 +11787,8 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
}
TAOS_CHECK_EXIT(tDecodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq));
dataAfterCreate = pCoder->data + pCoder->pos;
posAfterCreate = pCoder->pos;
}
// submit data
@ -11732,7 +11797,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &pSubmitTbData->sver));
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
uint64_t nColData;
uint64_t nColData = 0;
TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nColData));
@ -11745,7 +11810,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, taosArrayReserve(pSubmitTbData->aCol, 1)));
}
} else {
uint64_t nRow;
uint64_t nRow = 0;
TAOS_CHECK_EXIT(tDecodeU64v(pCoder, &nRow));
pSubmitTbData->aRowP = taosArrayInit(nRow, sizeof(SRow *));
@ -11768,6 +11833,15 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->ctimeMs));
}
if (rawData != NULL){
if (dataAfterCreate != NULL){
TAOS_MEMCPY(dataAfterCreate - INT_BYTES - flagsLen, dataStart, INT_BYTES + flagsLen);
*(int32_t*)(dataAfterCreate - INT_BYTES - flagsLen) = pCoder->pos - posAfterCreate + flagsLen;
*(void**)rawData = dataAfterCreate - INT_BYTES - flagsLen;
}else{
*(void**)rawData = dataStart;
}
}
tEndDecode(pCoder);
_exit:
@ -11780,15 +11854,27 @@ int32_t tEncodeSubmitReq(SEncoder *pCoder, const SSubmitReq2 *pReq) {
TAOS_CHECK_EXIT(tStartEncode(pCoder));
TAOS_CHECK_EXIT(tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)));
for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) {
TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i)));
if (pReq->raw){
for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) {
void* data = taosArrayGetP(pReq->aSubmitTbData, i);
if (pCoder->data != NULL){
TAOS_MEMCPY(pCoder->data + pCoder->pos, data, *(uint32_t*)data + INT_BYTES);
}
pCoder->pos += *(uint32_t*)data + INT_BYTES;
}
} else{
for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) {
TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i)));
}
}
tEndEncode(pCoder);
_exit:
return code;
}
int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq) {
int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq, SArray* rawList) {
int32_t code = 0;
memset(pReq, 0, sizeof(*pReq));
@ -11812,7 +11898,8 @@ int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq) {
}
for (uint64_t i = 0; i < nSubmitTbData; i++) {
if (tDecodeSSubmitTbData(pCoder, taosArrayReserve(pReq->aSubmitTbData, 1)) < 0) {
SSubmitTbData* data = taosArrayReserve(pReq->aSubmitTbData, 1);
if (tDecodeSSubmitTbData(pCoder, data, rawList != NULL ? taosArrayReserve(rawList, 1) : NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
@ -11821,13 +11908,6 @@ int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq) {
tEndDecode(pCoder);
_exit:
if (code) {
if (pReq->aSubmitTbData) {
// todo
taosArrayDestroy(pReq->aSubmitTbData);
pReq->aSubmitTbData = NULL;
}
}
return code;
}
@ -11883,12 +11963,15 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) {
if (pReq->aSubmitTbData == NULL) return;
int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData);
SSubmitTbData *aSubmitTbData = (SSubmitTbData *)TARRAY_DATA(pReq->aSubmitTbData);
if (!pReq->raw){
int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData);
SSubmitTbData *aSubmitTbData = (SSubmitTbData *)TARRAY_DATA(pReq->aSubmitTbData);
for (int32_t i = 0; i < nSubmitTbData; i++) {
tDestroySubmitTbData(&aSubmitTbData[i], flag);
for (int32_t i = 0; i < nSubmitTbData; i++) {
tDestroySubmitTbData(&aSubmitTbData[i], flag);
}
}
taosArrayDestroy(pReq->aSubmitTbData);
pReq->aSubmitTbData = NULL;
}

View File

@ -2731,7 +2731,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
terrno = 0;
if (NULL == pReq) {
if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
code = terrno;
goto _end;
}

View File

@ -242,11 +242,11 @@ SSDataBlock *tqGetResultBlock(STqReader *pReader);
int64_t tqGetResultBlockTime(STqReader *pReader);
int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver, SArray* rawList);
void tqReaderClearSubmitMsg(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet,
int64_t *createTime);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SMqDataRsp* pRsp, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta);
int32_t tqGetStreamExecInfo(SVnode *pVnode, int64_t streamId, int64_t *pDelay, bool *fhFinished);
// sma

View File

@ -90,6 +90,7 @@ typedef struct {
// for replay
SSDataBlock* block;
int64_t blockTime;
SHashObj* tableCreateTimeHash; // for process create table msg in submit if fetch raw data
} STqHandle;
struct STQ {
@ -117,8 +118,8 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, SMqDataRsp* pRsp,
int32_t type, int32_t vgId);
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
@ -147,7 +148,7 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name);
// tq util
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
void tqUpdateNodeStage(STQ* pTq, bool isLeader);
@ -183,6 +184,7 @@ int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode*
#define TQ_SUBSCRIBE_NAME "subscribe"
#define TQ_OFFSET_NAME "offset-ver0"
#define TQ_POLL_MAX_TIME 1000
#define TQ_POLL_MAX_BYTES 1048576
#ifdef __cplusplus
}

View File

@ -167,7 +167,8 @@ int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tb
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t* createTime);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock);
int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);

View File

@ -378,7 +378,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
return 0;
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime) {
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
void *pData = NULL;
int nData = 0;
int64_t version;
@ -414,9 +414,6 @@ _query:
}
} else if (me.type == TSDB_CHILD_TABLE) {
uid = me.ctbEntry.suid;
if (createTime != NULL){
*createTime = me.ctbEntry.btime;
}
tDecoderClear(&dc);
goto _query;
} else {
@ -455,6 +452,46 @@ _err:
return NULL;
}
int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock) {
void *pData = NULL;
int nData = 0;
int64_t version = 0;
SDecoder dc = {0};
int64_t createTime = INT64_MAX;
if (lock) {
metaRLock(pMeta);
}
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
goto _exit;
}
version = ((SUidIdxVal *)pData)[0].version;
if (tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData) != 0) {
goto _exit;
}
SMetaEntry me = {0};
tDecoderInit(&dc, pData, nData);
int32_t code = metaDecodeEntry(&dc, &me);
if (code) {
tDecoderClear(&dc);
goto _exit;
}
if (me.type == TSDB_CHILD_TABLE) {
createTime = me.ctbEntry.btime;
}
tDecoderClear(&dc);
_exit:
if (lock) {
metaULock(pMeta);
}
tdbFree(pData);
return createTime;
}
SMCtbCursor *metaOpenCtbCursor(void *pVnode, tb_uid_t uid, int lock) {
SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
SMCtbCursor *pCtbCur = NULL;
@ -627,7 +664,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL;
pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL);
pSW = metaGetTableSchema(pMeta, uid, sver, lock);
if (!pSW) return NULL;
pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version);

View File

@ -548,7 +548,7 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
bool ret = false;
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL);
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1);
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
ret = true;
}

View File

@ -57,6 +57,7 @@ void tqDestroyTqHandle(void* data) {
if (pData->pRef) {
walCloseRef(pData->pRef->pWal, pData->pRef->refId);
}
taosHashCleanup(pData->tableCreateTimeHash);
}
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
@ -211,7 +212,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
tDeleteMqDataRsp(&dataRsp);
}
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type,
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, SMqDataRsp* pRsp, int32_t type,
int32_t vgId) {
if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
return TSDB_CODE_INVALID_PARA;
@ -414,7 +415,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
terrno = TSDB_CODE_INVALID_MSG;
goto END;
}
if (req.rawData == 1){
req.uidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (req.uidHash == NULL) {
tqError("tq poll rawData taosHashInit failed");
code = terrno;
goto END;
}
}
int64_t consumerId = req.consumerId;
int32_t reqEpoch = req.epoch;
STqOffsetVal reqOffset = req.reqOffset;

View File

@ -343,6 +343,7 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL);
taosArrayDestroy(tbUidList);
}
handle->tableCreateTimeHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
END:
return code;

View File

@ -283,7 +283,7 @@ void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
return;
}
bool ret = false;
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1);
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
ret = true;
}
@ -352,7 +352,7 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
return TSDB_CODE_INVALID_PARA;
}
if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
return -1;
return terrno;
}
tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
return 0;
@ -485,14 +485,14 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pWalReader->pHead->head.version;
if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver) != 0) {
if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL) != 0) {
return false;
}
pReader->nextBlk = 0;
}
}
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList) {
if (pReader == NULL) {
return TSDB_CODE_INVALID_PARA;
}
@ -500,11 +500,11 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
pReader->msg.msgLen = msgLen;
pReader->msg.ver = ver;
tqDebug("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
SDecoder decoder = {0};
tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit);
int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit, rawList);
tDecoderClear(&decoder);
if (code != 0) {
@ -514,6 +514,13 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
return code;
}
void tqReaderClearSubmitMsg(STqReader *pReader) {
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
}
SWalReader* tqGetWalReader(STqReader* pReader) {
if (pReader == NULL) {
return NULL;
@ -551,17 +558,15 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
tqDebug("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%"PRId64, pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
pReader->nextBlk++;
}
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid);
tqReaderClearSubmitMsg(pReader);
tqTrace("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid);
END:
tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
tqTrace("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
return code;
}
@ -581,17 +586,14 @@ bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
uid = pSubmitTbData->uid;
void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
TSDB_CHECK_NULL(ret, code, lino, END, true);
tqDebug("iterator data block in hash continue, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
pReader->nextBlk++;
}
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid);
tqReaderClearSubmitMsg(pReader);
tqTrace("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid);
END:
tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
tqTrace("%s:%d get data:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
return code;
}
@ -740,7 +742,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
(pReader->cachedSchemaVer != sversion)) {
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
"version %d, possibly dropped table",
@ -934,7 +936,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData
TQ_NULL_GO_TO_END(pCol);
int32_t numOfRows = pCol->nVal;
int32_t numOfCols = taosArrayGetSize(pCols);
tqDebug("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows);
tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows);
for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false;
@ -974,7 +976,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData
}
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
tqDebug("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
END:
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
@ -997,7 +999,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra
int32_t numOfRows = taosArrayGetSize(pRows);
pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
TQ_NULL_GO_TO_END(pTSchema);
tqDebug("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false;
@ -1036,7 +1038,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
tqDebug("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows, (int)taosArrayGetSize(blocks));
tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows, (int)taosArrayGetSize(blocks));
END:
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
@ -1046,8 +1048,46 @@ END:
return code;
}
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
tqDebug("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
int32_t code = 0;
int32_t lino = 0;
void* createReq = NULL;
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
}
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
TSDB_CHECK_CODE(code, lino, END);
createReq = taosMemoryCalloc(1, len);
TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
tEncoderClear(&encoder);
TSDB_CHECK_CODE(code, lino, END);
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
pRsp->createTableNum++;
tqTrace("build create table info msg success");
END:
if (code != 0){
tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
taosMemoryFree(createReq);
}
return code;
}
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pSubmitTbData == NULL) {
return terrno;
@ -1063,7 +1103,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
pReader->lastBlkUid = uid;
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
@ -1071,6 +1111,19 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
}
if (pSubmitTbData->pCreateTbReq != NULL) {
int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
if (code != 0) {
return code;
}
} else if (rawList != NULL) {
if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){
return terrno;
}
pReader->pSchemaWrapper = NULL;
return 0;
}
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
} else {

View File

@ -14,6 +14,33 @@
*/
#include "tq.h"
static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t precision) {
int32_t code = TDB_CODE_SUCCESS;
int32_t lino = 0;
void* buf = NULL;
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + *(uint32_t *)rawData + INT_BYTES;
buf = taosMemoryCalloc(1, dataStrLen);
TSDB_CHECK_NULL(buf, code, lino, END, terrno);
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION;
pRetrieve->precision = precision;
pRetrieve->compressed = 0;
memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
pRsp->blockDataElementFree = true;
tqTrace("tqAddRawDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
END:
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf);
tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code));
}
return code;
}
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
int32_t code = 0;
@ -25,7 +52,7 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp,
TSDB_CHECK_NULL(buf, code, lino, END, terrno);
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
pRetrieve->version = 1;
pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_VERSION;
pRetrieve->precision = precision;
pRetrieve->compressed = 0;
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
@ -36,13 +63,14 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp,
actualLen += sizeof(SRetrieveTableRspForTmq);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
pRsp->blockDataElementFree = true;
tqTrace("tqAddBlockDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
buf = NULL;
END:
if (code != 0){
if (code != TSDB_CODE_SUCCESS){
taosMemoryFree(buf);
tqError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
}
taosMemoryFree(buf);
return code;
}
@ -66,7 +94,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid);
continue;
}
tqDebug("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
tqTrace("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
}
END:
@ -234,7 +262,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
tbName = NULL;
}
if (pRsp->withSchema) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
TSDB_CHECK_NULL(pSW, code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno);
pSW = NULL;
@ -290,45 +318,8 @@ END:
return code;
}
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
int32_t code = 0;
int32_t lino = 0;
void* createReq = NULL;
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
}
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
TSDB_CHECK_CODE(code, lino, END);
createReq = taosMemoryCalloc(1, len);
TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
tEncoderClear(&encoder);
TSDB_CHECK_CODE(code, lino, END);
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
pRsp->createTableNum++;
tqDebug("build create table info msg success");
END:
if (code != 0){
tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
taosMemoryFree(createReq);
}
return code;
}
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows,
const SMqPollReq* pRequest, SArray* rawList){
int32_t code = 0;
int32_t lino = 0;
SArray* pBlocks = NULL;
@ -342,78 +333,161 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
pSchemas = taosArrayInit(0, sizeof(void*));
TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
SSubmitTbData* pSubmitTbDataRet = NULL;
int64_t createTime = INT64_MAX;
code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
SSubmitTbData* pSubmitTbData = NULL;
code = tqRetrieveTaosxBlock(pReader, pRsp, pBlocks, pSchemas, &pSubmitTbData, rawList, pHandle->fetchMeta);
TSDB_CHECK_CODE(code, lino, END);
bool tmp = (pSubmitTbDataRet->flags & sourceExcluded) != 0;
bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0;
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks);
if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks));
code = tqAddTbNameToRsp(pTq, uid, pRsp, blockNum);
TSDB_CHECK_CODE(code, lino, END);
}
if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) { // judge if table is already created to avoid sending crateTbReq
code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
TSDB_CHECK_CODE(code, lino, END);
}
}
tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL);
tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbData->pCreateTbReq == NULL);
TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
if (pBlock == NULL) {
continue;
for (int32_t i = 0; i < blockNum; i++) {
if (taosArrayGetSize(pBlocks) == 0){
void* rawData = taosArrayGetP(rawList, pReader->nextBlk - 1);
if (rawData == NULL) {
continue;
}
if (tqAddRawDataToRsp(rawData, pRsp, pTq->pVnode->config.tsdbCfg.precision) != 0){
tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
continue;
}
*totalRows += *(uint32_t *)rawData + INT_BYTES; // bytes actually
} else {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
if (pBlock == NULL) {
continue;
}
if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision) != 0){
tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
continue;
}
*totalRows += pBlock->info.rows;
}
if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision) != 0){
tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
continue;
}
*totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
void** pSW = taosArrayGet(pSchemas, i);
if (taosArrayPush(pRsp->blockSchema, pSW) == NULL){
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
continue;
}
*pSW = NULL;
pRsp->blockNum++;
}
tqDebug("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
END:
if (code != 0){
if (code != 0) {
tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
} else {
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
}
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
}
static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){
STqExecHandle* pExec = &pHandle->execHandle;
STqReader* pReader = pExec->pTqReader;
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
for (int32_t i = 0; i < blockSz; i++){
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, i);
if (pSubmitTbData== NULL){
taosArrayDestroy(*rawList);
*rawList = NULL;
return;
}
int64_t uid = pSubmitTbData->uid;
if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) {
tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid);
terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
return;
} else {
int32_t code = taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES);
if (code != 0){
tqError("failed to add table uid to hash, code:%d, uid:%"PRId64, code, uid);
}
}
if (pSubmitTbData->pCreateTbReq == NULL){
continue;
}
int64_t createTime = INT64_MAX;
int64_t *cTime = (int64_t*)taosHashGet(pHandle->tableCreateTimeHash, &uid, LONG_BYTES);
if (cTime != NULL){
createTime = *cTime;
} else{
createTime = metaGetTableCreateTime(pReader->pVnodeMeta, uid, 1);
if (createTime != INT64_MAX){
int32_t code = taosHashPut(pHandle->tableCreateTimeHash, &uid, LONG_BYTES, &createTime, LONG_BYTES);
if (code != 0){
tqError("failed to add table create time to hash,code:%d, uid:%"PRId64, code, uid);
}
}
}
if (pHandle->fetchMeta == WITH_DATA || pSubmitTbData->ctimeMs > createTime){
tDestroySVCreateTbReq(pSubmitTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
pSubmitTbData->pCreateTbReq = NULL;
} else{
taosArrayDestroy(*rawList);
*rawList = NULL;
}
}
}
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) {
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) {
int32_t code = 0;
int32_t lino = 0;
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(totalRows, code, lino, END, TSDB_CODE_INVALID_PARA);
STqExecHandle* pExec = &pHandle->execHandle;
STqReader* pReader = pExec->pTqReader;
code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
SArray *rawList = NULL;
if (pRequest->rawData){
rawList = taosArrayInit(0, POINTER_BYTES);
TSDB_CHECK_NULL(rawList, code, lino, END, terrno);
}
code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList);
TSDB_CHECK_CODE(code, lino, END);
if (pRequest->rawData) {
preProcessSubmitMsg(pHandle, pRequest, &rawList);
}
// data could not contains same uid data in rawdata mode
if (pRequest->rawData != 0 && terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
goto END;
}
// this submit data is metadata and previous data is rawdata
if (pRequest->rawData != 0 && *totalRows > 0 && pRsp->createTableNum == 0 && rawList == NULL){
tqDebug("poll rawdata split,vgId:%d, this wal submit data contains metadata and previous data is data", pTq->pVnode->config.vgId);
terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
goto END;
}
// this submit data is rawdata and previous data is metadata
if (pRequest->rawData != 0 && pRsp->createTableNum > 0 && rawList != NULL){
tqDebug("poll rawdata split,vgId:%d, this wal submit data is data and previous data is metadata", pTq->pVnode->config.vgId);
terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
goto END;
}
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
while (tqNextBlockImpl(pReader, NULL)) {
tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
}
}
END:
tqReaderClearSubmitMsg(pReader);
taosArrayDestroy(rawList);
if (code != 0){
tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code));
}

View File

@ -131,7 +131,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
}
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
tDeleteMqDataRsp(&dataRsp);
*pBlockReturned = true;
@ -222,6 +222,10 @@ end:
static void tDeleteCommon(void* parm) {}
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) {
int32_t vgId = TD_VID(pTq->pVnode);
@ -274,7 +278,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
}
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
goto END;
}
@ -287,7 +291,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
goto END;
}
@ -370,12 +374,19 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
.ver = pHead->version,
};
TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded));
TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
if ((pRequest->rawData == 0 && totalRows >= tmqRowSize) ||
(taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout)) ||
(pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES ||
taosArrayGetSize(taosxRsp.blockData) > tmqRowSize ||
terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
// tqDebug("start to send rsp, block num:%d, totalRows:%d, createTableNum:%d, terrno:%d",
// (int)taosArrayGetSize(taosxRsp.blockData), totalRows, taosxRsp.createTableNum, terrno);
tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT ? fetchVer : fetchVer + 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){terrno = 0;}
goto END;
} else {
fetchVer++;
@ -526,7 +537,7 @@ int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPoll
return 0;
}
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever) {
if (pRpcHandleInfo == NULL || pRsp == NULL) {
return TSDB_CODE_TMQ_INVALID_MSG;
@ -534,7 +545,12 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
int32_t len = 0;
int32_t code = 0;
if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
pRsp->withSchema = 0;
}
if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
type == TMQ_MSG_TYPE__WALINFO_RSP ||
type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
@ -558,7 +574,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len);
if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
type == TMQ_MSG_TYPE__WALINFO_RSP ||
type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
code = tEncodeMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
code = tEncodeSTaosxRsp(&encoder, pRsp);

View File

@ -739,7 +739,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
}
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL);
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0);
if (pSW) {
*num = pSW->nCols;
tDeleteSchemaWrapper(pSW);

View File

@ -314,25 +314,25 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
uint64_t nColData;
if (tDecodeU64v(pCoder, &nColData) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
SColData colData = {0};
code = tDecodeColData(version, pCoder, &colData);
if (code) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (colData.flag != HAS_VALUE) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t iRow = 0; iRow < colData.nVal; iRow++) {
if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) {
code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -340,14 +340,14 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
code = tDecodeColData(version, pCoder, &colData);
if (code) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else {
uint64_t nRow;
if (tDecodeU64v(pCoder, &nRow) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
@ -356,7 +356,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
if (pRow->ts < minKey || pRow->ts > maxKey) {
code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
@ -369,6 +369,9 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
tEndDecode(pCoder);
_exit:
if (code) {
vError("vgId:%d, %s:%d failed to vnodePreProcessSubmitTbData submit request since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
@ -1889,7 +1892,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
len -= sizeof(SSubmitReq2Msg);
SDecoder dc = {0};
tDecoderInit(&dc, pReq, len);
if (tDecodeSubmitReq(&dc, pSubmitReq) < 0) {
if (tDecodeSubmitReq(&dc, pSubmitReq, NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}

View File

@ -187,7 +187,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
terrno = TSDB_CODE_SUCCESS;
if (NULL == pReq) {
if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
goto _end;
}

View File

@ -3953,7 +3953,7 @@ FETCH_NEXT_BLOCK:
QUERY_CHECK_NULL(pSubmit, code, lino, _end, terrno);
qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id);
if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) <
if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver, NULL) <
0) {
qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current,
totalBlocks, id);

View File

@ -849,11 +849,6 @@ static int32_t physiDispatchCopy(const SDataDispatcherNode* pSrc, SDataDispatche
return TSDB_CODE_SUCCESS;
}
static int32_t physiInserterCopy(const SDataInserterNode* pSrc, SDataInserterNode* pDst) {
COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy);
return TSDB_CODE_SUCCESS;
}
static int32_t physiQueryInserterCopy(const SQueryInserterNode* pSrc, SQueryInserterNode* pDst) {
COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy);
CLONE_NODE_LIST_FIELD(pCols);
@ -1135,9 +1130,6 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) {
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
code = physiDispatchCopy((const SDataDispatcherNode*)pNode, (SDataDispatcherNode*)pDst);
break;
//case QUERY_NODE_PHYSICAL_PLAN_INSERT:
// code = physiInserterCopy((const SDataInserterNode*)pNode, (SDataInserterNode*)pDst);
// break;
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
code = physiQueryInserterCopy((const SQueryInserterNode*)pNode, (SQueryInserterNode*)pDst);
break;

View File

@ -2000,7 +2000,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_INSERT: {
SDataInserterNode* pSink = (SDataInserterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
taosMemoryFreeClear(pSink->pData);
taosMemFreeClear(pSink->pData);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {

View File

@ -403,3 +403,18 @@ end:
}
return code;
}
int32_t smlBuildOutputRaw(SQuery* handle, SHashObj* pVgHash) {
int32_t lino = 0;
int32_t code = 0;
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot;
code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false);
TSDB_CHECK_CODE(code, lino, end);
end:
if (code != 0) {
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
}
return code;
}

View File

@ -483,7 +483,7 @@ static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCx
return code;
}
static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHash, SArray* pVgroupList,
static int32_t createVgroupDataCxt(int32_t vgId, SHashObj* pVgroupHash, SArray* pVgroupList,
SVgroupDataCxt** pOutput) {
SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
if (NULL == pVgCxt) {
@ -495,7 +495,7 @@ static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHa
return terrno;
}
pVgCxt->vgId = pTableCxt->pMeta->vgId;
pVgCxt->vgId = vgId;
int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
if (TSDB_CODE_SUCCESS == code) {
if (NULL == taosArrayPush(pVgroupList, &pVgCxt)) {
@ -642,7 +642,7 @@ int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData,
if (NULL == pp) {
pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
if (NULL == pp) {
code = createVgroupDataCxt(pTbCtx, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
code = createVgroupDataCxt(vgId, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
} else {
pVgCxt = *(SVgroupDataCxt**)pp;
}
@ -723,7 +723,7 @@ int32_t tbNum) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFun
*/
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES);
if (NULL == pVgroupHash || NULL == pVgroupList) {
taosHashCleanup(pVgroupHash);
@ -777,7 +777,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool
int32_t vgId = pTableCxt->pMeta->vgId;
void** pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
if (NULL == pp) {
code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
code = createVgroupDataCxt(vgId, pVgroupHash, pVgroupList, &pVgCxt);
} else {
pVgCxt = *(SVgroupDataCxt**)pp;
}
@ -830,6 +830,7 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin
}
static void destroyVgDataBlocks(void* p) {
if (p == NULL) return;
SVgDataBlocks* pVg = p;
taosMemoryFree(pVg->pData);
taosMemoryFree(pVg);
@ -855,7 +856,6 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
if (TSDB_CODE_SUCCESS == code) {
dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
// uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
@ -863,6 +863,9 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
if (TSDB_CODE_SUCCESS == code) {
code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);
}
if (TSDB_CODE_SUCCESS != code) {
destroyVgDataBlocks(dst);
}
}
if (append) {
@ -1074,3 +1077,33 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
end:
return ret;
}
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data) {
transformRawSSubmitTbData(data, pTableMeta->suid, pTableMeta->uid, pTableMeta->sversion);
SVgroupDataCxt* pVgCxt = NULL;
void** pp = taosHashGet(pVgroupHash, &pTableMeta->vgId, sizeof(pTableMeta->vgId));
if (NULL == pp) {
int code = createVgroupDataCxt(pTableMeta->vgId, pVgroupHash, pVgroupList, &pVgCxt);
if (code != 0){
return code;
}
} else {
pVgCxt = *(SVgroupDataCxt**)pp;
}
if (NULL == pVgCxt->pData->aSubmitTbData) {
pVgCxt->pData->aSubmitTbData = taosArrayInit(0, POINTER_BYTES);
pVgCxt->pData->raw = true;
if (NULL == pVgCxt->pData->aSubmitTbData) {
return terrno;
}
}
// push data to submit, rebuild empty data for next submit
if (NULL == taosArrayPush(pVgCxt->pData->aSubmitTbData, &data)) {
return terrno;
}
uTrace("add raw data to vgId:%d, len:%d", pTableMeta->vgId, *(int32_t*)data);
return 0;
}

View File

@ -2839,7 +2839,8 @@ static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlock
pInserter->numOfTables = pBlocks->numOfTables;
pInserter->size = pBlocks->size;
TSWAP(pInserter->pData, pBlocks->pData);
pInserter->pData = pBlocks->pData;
pBlocks->pData = NULL;
*pSink = (SDataSinkNode*)pInserter;
return TSDB_CODE_SUCCESS;

View File

@ -1110,7 +1110,7 @@ _return:
}
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType, void* param) {
int32_t msgSize = 0;
int32_t msgSize = 0;
void *msg = NULL;
int32_t code = 0;
bool isCandidateAddr = false;
@ -1136,13 +1136,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case TDMT_VND_SUBMIT:
case TDMT_VND_COMMIT: {
msgSize = pTask->msgLen;
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(terrno);
}
TAOS_MEMCPY(msg, pTask->msg, msgSize);
msg = pTask->msg;
pTask->msg = NULL;
break;
}

View File

@ -163,7 +163,7 @@ void* rpcMallocCont(int64_t contLen) {
tTrace("malloc mem:%p size:%" PRId64, start, size);
}
return start + sizeof(STransMsgHead);
return start + TRANS_MSG_OVERHEAD;
}
void rpcFreeCont(void* cont) { transFreeMsg(cont); }

View File

@ -859,6 +859,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_DATA, "Invalid data use here")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_RAW_DATA_SPLIT, "Split submit data for rawdata")
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")

View File

@ -61,7 +61,8 @@ ignoreCodes = [
'0x80003107', '0x80003108', '0x80003109', '0x80003110', '0x80003111', '0x80003112', '0x80003250', '0x80004003', '0x80004004', '0x80004005',
'0x80004006', '0x80004007', '0x80004008', '0x80004009', '0x80004010', '0x80004011', '0x80004012', '0x80004013', '0x80004014', '0x80004015',
'0x80004016', '0x80004102', '0x80004103', '0x80004104', '0x80004105', '0x80004106', '0x80004107', '0x80004108', '0x80004109', '0x80005100',
'0x80005101', '0x80006000', '0x80006100', '0x80006101', '0x80006102', '0x80000019', '0x80002639', '0x80002666', '0x80000237']
'0x80005101', '0x80006000', '0x80006100', '0x80006101', '0x80006102', '0x80000019', '0x80002639', '0x80002666', '0x80000237', '0x80004018',
'0x80004019']
class TDTestCase(TBase):

View File

@ -455,6 +455,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td33504.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5776.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5906.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py

File diff suppressed because one or more lines are too long

View File

@ -17,6 +17,8 @@ from tmqCommon import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
clientCfgDict = {'debugFlag': 135, 'asynclog': 0}
updatecfgDict["clientCfg"] = clientCfgDict
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
@ -65,7 +67,7 @@ class TDTestCase:
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(3)
tdSql.checkRows(5)
tdSql.checkData(0, 1, 1)
tdSql.checkData(2, 1, 21)
tdSql.checkData(0, 2, 2)
@ -96,7 +98,7 @@ class TDTestCase:
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(3)
tdSql.checkRows(5)
tdSql.checkData(0, 1, 1)
tdSql.checkData(2, 1, 21)
tdSql.checkData(0, 2, 2)
@ -256,6 +258,17 @@ class TDTestCase:
return
def checkWalMultiVgroupsRawData(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 3 -dv 5 -raw'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkData()
self.checkDropData(False)
return
def checkWalMultiVgroupsWithDropTable(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 3 -dv 5 -d'%(buildPath)
@ -681,6 +694,7 @@ class TDTestCase:
self.checkSnapshot1VgroupTable()
self.checkWalMultiVgroups()
self.checkWalMultiVgroupsRawData()
self.checkSnapshotMultiVgroups()
self.checkWalMultiVgroupsWithDropTable()

View File

@ -0,0 +1,39 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from taos.tmq import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def run(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_ts5776'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
return
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -6,6 +6,7 @@ add_executable(tmq_taosx_ci tmq_taosx_ci.c)
add_executable(tmq_ts5466 tmq_ts5466.c)
add_executable(tmq_td32526 tmq_td32526.c)
add_executable(tmq_td32187 tmq_td32187.c)
add_executable(tmq_ts5776 tmq_ts5776.c)
add_executable(tmq_td32471 tmq_td32471.c)
add_executable(tmq_write_raw_test tmq_write_raw_test.c)
add_executable(write_raw_block_test write_raw_block_test.c)
@ -87,6 +88,13 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_ts5776
PUBLIC ${TAOS_LIB}
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_taosx_ci
PUBLIC ${TAOS_LIB}

View File

@ -34,6 +34,7 @@ typedef struct {
int dstVgroups;
char dir[256];
bool btMeta;
bool rawData;
} Config;
Config g_conf = {0};
@ -424,6 +425,15 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
}
taos_free_result(pRes);
pRes =
taos_query(pConn,
"insert into stt1 values(now + 322s, 3, 2, 'stt1')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes =
taos_query(pConn,
"insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + "
@ -436,6 +446,15 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
}
taos_free_result(pRes);
pRes =
taos_query(pConn,
"insert into stt1 values(now + 442s, 3, 2, 'stt1')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
@ -633,6 +652,9 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1");
if (g_conf.rawData) {
tmq_conf_set(conf, "msg.consume.rawdata", "1");
}
// tmq_conf_set(conf, "session.timeout.ms", "1000000");
// tmq_conf_set(conf, "max.poll.interval.ms", "20000");
@ -1238,6 +1260,8 @@ int main(int argc, char* argv[]) {
g_conf.meta = 1;
} else if (strcmp(argv[i], "-bt") == 0) {
g_conf.btMeta = true;
} else if (strcmp(argv[i], "-raw") == 0) {
g_conf.rawData = true;
}
}

255
utils/test/c/tmq_ts5776.c Normal file
View File

@ -0,0 +1,255 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "cJSON.h"
#include "taos.h"
#include "tmsg.h"
#include "types.h"
static TAOS* use_db() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return NULL;
}
TAOS_RES* pRes = taos_query(pConn, "use db_dst");
if (taos_errno(pRes) != 0) {
printf("error in use db_taosx, reason:%s\n", taos_errstr(pRes));
return NULL;
}
taos_free_result(pRes);
return pConn;
}
static void msg_process(TAOS_RES* msg) {
printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg));
printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
TAOS* pConn = use_db();
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META || tmq_get_res_type(msg) == TMQ_RES_METADATA) {
char* result = tmq_get_json_meta(msg);
printf("meta result: %s\n", result);
tmq_free_json_meta(result);
}
tmq_raw_data raw = {0};
tmq_get_raw(msg, &raw);
printf("write raw data type: %d\n", raw.raw_type);
int32_t ret = tmq_write_raw(pConn, raw);
printf("write raw data: %s\n", tmq_err2str(ret));
ASSERT(ret == 0);
tmq_free_raw(raw);
taos_close(pConn);
}
int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(
pConn,
"insert into ct3 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c') ct1 using st1(t1) tags(2000) values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, "
"'ddd') ct0 using st1 tags(1000, \"ttt\", true) values(1626006833603, 4, 3, 'hwj') ct1 using st1(t1) tags(2000) values(now+5s, 23, 32, 's21ds')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(
pConn,
"insert into ct3 values(1626006839602, 5, 6, 'c')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(
pConn,
"insert into ct3 using st1(t1) tags(3000) values(1626006833602, 5, 6, 'c') ct1 values(1626006833611, 2, 3, 'sds') (1626006833612, 4, 5, "
"'ddd') ct10 using st1 tags(1000, \"ttt\", true) values(1626006833603, 4, 3, 'hwj')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_dst");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_dst vgroups 1 wal_retention_period 3600");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists topic_db");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists db_src");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_src vgroups 1 wal_retention_period 3600");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use db_src");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
buildDatabase(pConn, pRes);
taos_close(pConn);
return 0;
}
int32_t create_topic() {
printf("create topic\n");
TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
pRes = taos_query(pConn, "create topic topic_db with meta as database db_src");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p param %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.rawdata", "1");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_db");
return topic_list;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t code;
if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
printf("subscribe err\n");
return;
}
int32_t cnt = 0;
while (1) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
taos_free_result(tmqmessage);
} else {
break;
}
}
code = tmq_consumer_close(tmq);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
void check_result(){
TAOS* taos = use_db();
TAOS_RES *res = taos_query(taos, "select * from st1");
int code = taos_errno(res);
assert(code == 0);
int affectedRows = taos_affected_rows(res);
printf("affected rows %d\n", affectedRows);
assert(affectedRows == 10);
taos_free_result(res);
taos_close(taos);
}
int main(int argc, char* argv[]) {
if (init_env() < 0) {
return -1;
}
create_topic();
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list);
tmq_list_destroy(topic_list);
}