enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info
This commit is contained in:
parent
5f9de6ac70
commit
42b7520d34
|
@ -210,7 +210,7 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
|
|||
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
|
||||
|
||||
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);
|
||||
const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo);
|
||||
//const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo);
|
||||
|
||||
void* qExtractReaderFromStreamScanner(void* scanner);
|
||||
|
||||
|
|
|
@ -53,9 +53,7 @@
|
|||
#define TMQ_META_VERSION "1.0"
|
||||
|
||||
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
|
||||
|
||||
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
|
||||
|
||||
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
|
||||
SColCmprWrapper* pColCmprRow, cJSON** pJson) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1856,6 +1854,12 @@ static threadlocal SHashObj* pCreateTbHash = NULL;
|
|||
static threadlocal SHashObj* pNameHash = NULL;
|
||||
static threadlocal SHashObj* pMetaHash = NULL;
|
||||
|
||||
typedef struct{
|
||||
SVgroupInfo vgInfo;
|
||||
int64_t uid;
|
||||
int64_t suid;
|
||||
}tbInfo;
|
||||
|
||||
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW){
|
||||
char* p = (char*)rawData;
|
||||
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
|
||||
|
@ -1878,7 +1882,7 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe
|
|||
char* fieldName = pSW->pSchema[i].name;
|
||||
|
||||
if (strcmp(pColSchema->name, fieldName) == 0) {
|
||||
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
|
||||
if (*fields != pColSchema->type || *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
|
@ -1892,11 +1896,6 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe
|
|||
return false;
|
||||
}
|
||||
|
||||
static void tmqFreeMeta(void *data){
|
||||
STableMeta* pTableMeta = *(STableMeta**)data;
|
||||
taosMemoryFree(pTableMeta);
|
||||
}
|
||||
|
||||
static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t dataLen) {
|
||||
if (taos == NULL || data == NULL) {
|
||||
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
|
||||
|
@ -1933,23 +1932,6 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
|||
goto end;
|
||||
}
|
||||
|
||||
if (pVgHash == NULL){
|
||||
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(pVgHash);
|
||||
}
|
||||
if (pCreateTbHash == NULL){
|
||||
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(pCreateTbHash);
|
||||
}
|
||||
if (pNameHash == NULL){
|
||||
pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(pNameHash);
|
||||
}
|
||||
if (pMetaHash == NULL){
|
||||
pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(pMetaHash);
|
||||
taosHashSetFreeFp(pMetaHash, tmqFreeMeta);
|
||||
}
|
||||
struct SCatalog* pCatalog = NULL;
|
||||
RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
|
||||
|
||||
|
@ -1959,90 +1941,119 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
|||
conn.requestObjRefId = pRequest->self;
|
||||
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
|
||||
int retry = 0;
|
||||
while(1){
|
||||
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
|
||||
|
||||
uDebug(LOG_ID_TAG " write raw data type:%d block num:%d", LOG_ID_VALUE, type, rspObj.dataRsp.blockNum);
|
||||
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
|
||||
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
|
||||
RAW_NULL_CHECK(pRetrieve);
|
||||
if (!rspObj.dataRsp.withSchema) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
|
||||
RAW_NULL_CHECK(tbName);
|
||||
|
||||
int64_t* suid = taosArrayGet(rspObj.dataRsp.blockSuid, rspObj.resIter);
|
||||
RAW_NULL_CHECK(suid);
|
||||
|
||||
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
|
||||
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
|
||||
(void)strcpy(pName.dbname, pRequest->pDb);
|
||||
(void)strcpy(pName.tname, tbName);
|
||||
|
||||
// find schema data info
|
||||
SVCreateTbReq* pCreateReqDst = NULL;
|
||||
if (type == RES_TYPE__TMQ_METADATA){
|
||||
pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName));
|
||||
if (pCreateReqDst == NULL) {
|
||||
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
|
||||
pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t vgId = 0;
|
||||
SVgroupInfo* vg = (SVgroupInfo*)taosHashGet(pNameHash, tbName, strlen(tbName));
|
||||
if (vg == NULL) {
|
||||
SVgroupInfo vgTmp = {0};
|
||||
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgTmp));
|
||||
RAW_RETURN_CHECK(taosHashPut(pNameHash, tbName, strlen(tbName), &vgTmp, sizeof(SVgroupInfo)));
|
||||
code = taosHashPut(pVgHash, &vgTmp.vgId, sizeof(vgTmp.vgId), &vgTmp, sizeof(SVgroupInfo));
|
||||
code = (code == TSDB_CODE_DUP_KEY) ? 0 : code;
|
||||
RAW_RETURN_CHECK(code);
|
||||
vgId = vgTmp.vgId;
|
||||
} else {
|
||||
vgId = vg->vgId;
|
||||
}
|
||||
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
|
||||
RAW_NULL_CHECK(pSW);
|
||||
void* rawData = getRawDataFromRes(pRetrieve);
|
||||
RAW_NULL_CHECK(rawData);
|
||||
|
||||
STableMeta* pTableMeta = NULL;
|
||||
STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, suid, LONG_BYTES);
|
||||
if (pTableMetaTmp == NULL || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
|
||||
if (pCreateReqDst) { // change stable name to get meta
|
||||
(void)strcpy(pName.tname, pCreateReqDst->ctb.stbName);
|
||||
}
|
||||
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
|
||||
code = taosHashPut(pMetaHash, suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
|
||||
if (code != 0){
|
||||
taosMemoryFree(pTableMeta);
|
||||
uDebug(LOG_ID_TAG " write raw data type:%d block num:%d", LOG_ID_VALUE, type, rspObj.dataRsp.blockNum);
|
||||
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
|
||||
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
|
||||
RAW_NULL_CHECK(pRetrieve);
|
||||
if (!rspObj.dataRsp.withSchema) {
|
||||
goto end;
|
||||
}
|
||||
if (pCreateReqDst) {
|
||||
pTableMeta->vgId = vgId;
|
||||
pTableMeta->uid = pCreateReqDst->uid;
|
||||
pCreateReqDst->ctb.suid = pTableMeta->suid;
|
||||
|
||||
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
|
||||
RAW_NULL_CHECK(tbName);
|
||||
|
||||
// int64_t* suid = taosArrayGet(rspObj.dataRsp.blockSuid, rspObj.resIter);
|
||||
// RAW_NULL_CHECK(suid);
|
||||
|
||||
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
|
||||
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
|
||||
(void)strcpy(pName.dbname, pRequest->pDb);
|
||||
(void)strcpy(pName.tname, tbName);
|
||||
|
||||
// find schema data info
|
||||
SVCreateTbReq* pCreateReqDst = NULL;
|
||||
if (type == RES_TYPE__TMQ_METADATA){
|
||||
pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName));
|
||||
if (pCreateReqDst == NULL) {
|
||||
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
|
||||
pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName));
|
||||
}
|
||||
}
|
||||
}else{
|
||||
pTableMeta = *pTableMetaTmp;
|
||||
}
|
||||
STableMeta* pTableMeta = NULL;
|
||||
tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, tbName, strlen(tbName));
|
||||
if (tmpInfo == NULL || retry > 0) {
|
||||
tbInfo info = {0};
|
||||
|
||||
char err[ERR_MSG_LEN] = {0};
|
||||
code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
|
||||
goto end;
|
||||
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &info.vgInfo));
|
||||
if (pCreateReqDst) { // change stable name to get meta
|
||||
(void)strcpy(pName.tname, pCreateReqDst->ctb.stbName);
|
||||
}
|
||||
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
|
||||
info.uid = pTableMeta->uid;
|
||||
if (pTableMeta->tableType == TSDB_CHILD_TABLE){
|
||||
info.suid = pTableMeta->suid;
|
||||
} else {
|
||||
info.suid = pTableMeta->uid;
|
||||
}
|
||||
code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
|
||||
if (code != 0){
|
||||
taosMemoryFree(pTableMeta);
|
||||
goto end;
|
||||
}
|
||||
if (pCreateReqDst) {
|
||||
pTableMeta->vgId = info.vgInfo.vgId;
|
||||
pTableMeta->uid = pCreateReqDst->uid;
|
||||
pCreateReqDst->ctb.suid = pTableMeta->suid;
|
||||
}
|
||||
|
||||
code = taosHashPut(pNameHash, pName.tname, strlen(pName.tname), &info, sizeof(tbInfo));
|
||||
code = (code == TSDB_CODE_DUP_KEY) ? 0 : code;
|
||||
RAW_RETURN_CHECK(code);
|
||||
code = taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo));
|
||||
code = (code == TSDB_CODE_DUP_KEY) ? 0 : code;
|
||||
RAW_RETURN_CHECK(code);
|
||||
}
|
||||
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
|
||||
RAW_NULL_CHECK(pSW);
|
||||
void* rawData = getRawDataFromRes(pRetrieve);
|
||||
RAW_NULL_CHECK(rawData);
|
||||
|
||||
if (pTableMeta == NULL || retry > 0){
|
||||
STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
|
||||
if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
|
||||
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
|
||||
code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
|
||||
if (code != 0){
|
||||
taosMemoryFree(pTableMeta);
|
||||
goto end;
|
||||
}
|
||||
|
||||
}else{
|
||||
pTableMeta = *pTableMetaTmp;
|
||||
pTableMeta->uid = tmpInfo->uid;
|
||||
pTableMeta->vgId = tmpInfo->vgInfo.vgId;
|
||||
}
|
||||
}
|
||||
|
||||
char err[ERR_MSG_LEN] = {0};
|
||||
code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
|
||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||
code = pRequest->code;
|
||||
|
||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||
uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
|
||||
if (retry++ >= 3) {
|
||||
break;
|
||||
}
|
||||
qDestroyQuery(pQuery);
|
||||
pQuery = NULL;
|
||||
rspObj.resIter = -1;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
|
||||
|
||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||
code = pRequest->code;
|
||||
|
||||
end:
|
||||
uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||
if (type == RES_TYPE__TMQ_METADATA){
|
||||
|
@ -2056,18 +2067,6 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
|||
return code;
|
||||
}
|
||||
|
||||
void tmqClean() {
|
||||
taosHashCleanup(pMetaHash);
|
||||
taosHashCleanup(pNameHash);
|
||||
void* pIter = taosHashIterate(pCreateTbHash, NULL);
|
||||
while (pIter) {
|
||||
tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
|
||||
pIter = taosHashIterate(pCreateTbHash, pIter);
|
||||
}
|
||||
taosHashCleanup(pCreateTbHash);
|
||||
taosHashCleanup(pVgHash);
|
||||
}
|
||||
|
||||
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
|
||||
if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
|
||||
processCreateStb(pMetaRsp, meta);
|
||||
|
@ -2263,7 +2262,53 @@ void tmq_free_raw(tmq_raw_data raw) {
|
|||
(void)memset(terrMsg, 0, ERR_MSG_LEN);
|
||||
}
|
||||
|
||||
static void tmqFreeMeta(void *data){
|
||||
STableMeta* pTableMeta = *(STableMeta**)data;
|
||||
taosMemoryFree(pTableMeta);
|
||||
}
|
||||
|
||||
void freeHash() {
|
||||
taosHashCleanup(pMetaHash);
|
||||
taosHashCleanup(pNameHash);
|
||||
void* pIter = taosHashIterate(pCreateTbHash, NULL);
|
||||
while (pIter) {
|
||||
tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
|
||||
pIter = taosHashIterate(pCreateTbHash, pIter);
|
||||
}
|
||||
taosHashCleanup(pCreateTbHash);
|
||||
taosHashCleanup(pVgHash);
|
||||
}
|
||||
|
||||
static int32_t initHash(){
|
||||
int32_t code = 0;
|
||||
if (pVgHash == NULL){
|
||||
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(pVgHash);
|
||||
}
|
||||
if (pCreateTbHash == NULL){
|
||||
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(pCreateTbHash);
|
||||
}
|
||||
if (pNameHash == NULL){
|
||||
pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(pNameHash);
|
||||
}
|
||||
if (pMetaHash == NULL){
|
||||
pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(pMetaHash);
|
||||
taosHashSetFreeFp(pMetaHash, tmqFreeMeta);
|
||||
}
|
||||
return code;
|
||||
end:
|
||||
freeHash();
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
|
||||
int32_t code = initHash();
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
if (type == TDMT_VND_CREATE_STB) {
|
||||
return taosCreateStb(taos, buf, len);
|
||||
} else if (type == TDMT_VND_ALTER_STB) {
|
||||
|
|
|
@ -1064,7 +1064,7 @@ END:
|
|||
tDestroySMqHbReq(&req);
|
||||
if (tmrId != NULL) {
|
||||
bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
|
||||
tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq hb:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag);
|
||||
tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag);
|
||||
}
|
||||
int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
if (ret != 0){
|
||||
|
|
|
@ -10716,42 +10716,42 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tEncodeSuidArray(SEncoder *pEncoder, const SMqDataRsp *pRsp){
|
||||
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||
if (pRsp->withTbName) {
|
||||
int64_t* suid = taosArrayGet(pRsp->blockSuid, i);
|
||||
if (suid != NULL){
|
||||
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, *suid));
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
//int32_t tEncodeSuidArray(SEncoder *pEncoder, const SMqDataRsp *pRsp){
|
||||
// for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||
// if (pRsp->withTbName) {
|
||||
// int64_t* suid = taosArrayGet(pRsp->blockSuid, i);
|
||||
// if (suid != NULL){
|
||||
// TAOS_CHECK_RETURN(tEncodeI64(pEncoder, *suid));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// return 0;
|
||||
//}
|
||||
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
||||
TAOS_CHECK_RETURN(tEncodeMqDataRspCommon(pEncoder, pRsp));
|
||||
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->sleepTime));
|
||||
TAOS_CHECK_RETURN(tEncodeSuidArray(pEncoder, pRsp));
|
||||
// TAOS_CHECK_RETURN(tEncodeSuidArray(pEncoder, pRsp));
|
||||
|
||||
return 0;
|
||||
}
|
||||
int32_t tDecodeSuidArray(SDecoder *pDecoder, SMqDataRsp *pRsp){
|
||||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
if (pRsp->withTbName) {
|
||||
if ((pRsp->blockSuid = taosArrayInit(pRsp->blockNum, sizeof(int64_t))) == NULL) {
|
||||
TAOS_CHECK_RETURN(terrno);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||
int64_t suid = 0;
|
||||
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &suid));
|
||||
if (taosArrayPush(pRsp->blockSuid, &suid) == NULL) {
|
||||
TAOS_CHECK_RETURN(terrno);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
//int32_t tDecodeSuidArray(SDecoder *pDecoder, SMqDataRsp *pRsp){
|
||||
// if (!tDecodeIsEnd(pDecoder)) {
|
||||
// if (pRsp->withTbName) {
|
||||
// if ((pRsp->blockSuid = taosArrayInit(pRsp->blockNum, sizeof(int64_t))) == NULL) {
|
||||
// TAOS_CHECK_RETURN(terrno);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||
// int64_t suid = 0;
|
||||
// TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &suid));
|
||||
// if (taosArrayPush(pRsp->blockSuid, &suid) == NULL) {
|
||||
// TAOS_CHECK_RETURN(terrno);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// return 0;
|
||||
//}
|
||||
int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
@ -10828,9 +10828,9 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
|||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime));
|
||||
}
|
||||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
TAOS_CHECK_RETURN(tDecodeSuidArray(pDecoder, pRsp));
|
||||
}
|
||||
// if (!tDecodeIsEnd(pDecoder)) {
|
||||
// TAOS_CHECK_RETURN(tDecodeSuidArray(pDecoder, pRsp));
|
||||
// }
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -10844,8 +10844,8 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
|
|||
pRsp->blockSchema = NULL;
|
||||
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
|
||||
pRsp->blockTbName = NULL;
|
||||
taosArrayDestroy(pRsp->blockSuid);
|
||||
pRsp->blockSuid = NULL;
|
||||
// taosArrayDestroy(pRsp->blockSuid);
|
||||
// pRsp->blockSuid = NULL;
|
||||
tOffsetDestroy(&pRsp->reqOffset);
|
||||
tOffsetDestroy(&pRsp->rspOffset);
|
||||
}
|
||||
|
@ -10865,7 +10865,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
|||
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen));
|
||||
}
|
||||
}
|
||||
TAOS_CHECK_EXIT(tEncodeSuidArray(pEncoder, pRsp));
|
||||
// TAOS_CHECK_EXIT(tEncodeSuidArray(pEncoder, pRsp));
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
@ -10897,9 +10897,9 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeSuidArray(pDecoder, pRsp));
|
||||
}
|
||||
// if (!tDecodeIsEnd(pDecoder)) {
|
||||
// TAOS_CHECK_EXIT(tDecodeSuidArray(pDecoder, pRsp));
|
||||
// }
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
|
|
@ -77,14 +77,16 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
|
|||
tqError("failed to push tbName to blockTbName:%s", tbName);
|
||||
continue;
|
||||
}
|
||||
int64_t suid = 0;
|
||||
if(mr.me.type == TSDB_CHILD_TABLE){
|
||||
suid = mr.me.ctbEntry.suid;
|
||||
}
|
||||
if(taosArrayPush(pRsp->blockSuid, &suid) == NULL){
|
||||
tqError("failed to push suid to blockSuid:%"PRId64, suid);
|
||||
continue;
|
||||
}
|
||||
// int64_t suid = 0;
|
||||
// if(mr.me.type == TSDB_CHILD_TABLE){
|
||||
// suid = mr.me.ctbEntry.suid;
|
||||
// }else{
|
||||
// suid = mr.me.uid;
|
||||
// }
|
||||
// if(taosArrayPush(pRsp->blockSuid, &suid) == NULL){
|
||||
// tqError("failed to push suid to blockSuid:%"PRId64, suid);
|
||||
// continue;
|
||||
// }
|
||||
}
|
||||
metaReaderClear(&mr);
|
||||
return 0;
|
||||
|
@ -227,11 +229,11 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
|
|||
tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
}
|
||||
int64_t suid = qExtractSuidFromTask(task);
|
||||
if (taosArrayPush(pRsp->blockSuid, &suid) == NULL){
|
||||
tqError("vgId:%d, failed to add suid to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
}
|
||||
// int64_t suid = qExtractSuidFromTask(task);
|
||||
// if (taosArrayPush(pRsp->blockSuid, &suid) == NULL){
|
||||
// tqError("vgId:%d, failed to add suid to rsp msg", pTq->pVnode->config.vgId);
|
||||
// continue;
|
||||
// }
|
||||
}
|
||||
if (pRsp->withSchema) {
|
||||
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
|
||||
|
|
|
@ -50,11 +50,10 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
|||
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
||||
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
|
||||
pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
|
||||
pRsp->blockSuid = taosArrayInit(0, sizeof(int64_t));
|
||||
// pRsp->blockSuid = taosArrayInit(0, sizeof(int64_t));
|
||||
|
||||
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL ||
|
||||
pRsp->blockTbName == NULL || pRsp->blockSchema == NULL ||
|
||||
pRsp->blockSuid == NULL) {
|
||||
pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
|
||||
if (pRsp->blockData != NULL) {
|
||||
taosArrayDestroy(pRsp->blockData);
|
||||
pRsp->blockData = NULL;
|
||||
|
@ -75,10 +74,10 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
|||
pRsp->blockSchema = NULL;
|
||||
}
|
||||
|
||||
if (pRsp->blockSuid != NULL) {
|
||||
taosArrayDestroy(pRsp->blockSuid);
|
||||
pRsp->blockSuid = NULL;
|
||||
}
|
||||
// if (pRsp->blockSuid != NULL) {
|
||||
// taosArrayDestroy(pRsp->blockSuid);
|
||||
// pRsp->blockSuid = NULL;
|
||||
// }
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
|
|
@ -1212,10 +1212,10 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
|
|||
return pTaskInfo->streamInfo.tbName;
|
||||
}
|
||||
|
||||
const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
return pTaskInfo->streamInfo.suid;
|
||||
}
|
||||
//const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo) {
|
||||
// SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
// return pTaskInfo->streamInfo.suid;
|
||||
//}
|
||||
|
||||
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
|
@ -1499,7 +1499,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
|
||||
tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
|
||||
pTaskInfo->streamInfo.suid = mtInfo.suid;
|
||||
pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
|
||||
tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
|
||||
pTaskInfo->streamInfo.schema = mtInfo.schema;
|
||||
|
||||
|
|
|
@ -963,11 +963,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
ret = TSDB_CODE_INVALID_PARA;
|
||||
goto end;
|
||||
}
|
||||
if (tFields != NULL && numFields > boundInfo->numOfBound) {
|
||||
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound);
|
||||
ret = TSDB_CODE_INVALID_PARA;
|
||||
goto end;
|
||||
}
|
||||
// if (tFields != NULL && numFields > boundInfo->numOfBound) {
|
||||
// if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound);
|
||||
// ret = TSDB_CODE_INVALID_PARA;
|
||||
// goto end;
|
||||
// }
|
||||
if (tFields == NULL && numOfCols != boundInfo->numOfBound) {
|
||||
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to num of bound cols:%d", numOfCols, boundInfo->numOfBound);
|
||||
ret = TSDB_CODE_INVALID_PARA;
|
||||
|
@ -1037,6 +1037,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
pStart += numOfRows * sizeof(int32_t);
|
||||
} else {
|
||||
pStart += BitmapLen(numOfRows);
|
||||
// for(int k = 0; k < numOfRows; k++) {
|
||||
// if(!colDataIsNull_f(offset, k) && pColSchema->type == TSDB_DATA_TYPE_INT){
|
||||
// printf("colName:%s,val:%d", fieldName, *(int32_t*)(pStart + k * sizeof(int32_t)));
|
||||
// }
|
||||
// }
|
||||
}
|
||||
char* pData = pStart;
|
||||
|
||||
|
|
|
@ -131,14 +131,14 @@ class TDTestCase:
|
|||
tdSql.checkData(0, 2, 1)
|
||||
|
||||
tdSql.query("select * from ct3 order by c1 desc")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkRows(5)
|
||||
tdSql.checkData(0, 1, 51)
|
||||
tdSql.checkData(0, 4, 940)
|
||||
tdSql.checkData(1, 1, 23)
|
||||
tdSql.checkData(1, 4, None)
|
||||
|
||||
tdSql.query("select * from st1 order by ts")
|
||||
tdSql.checkRows(8)
|
||||
tdSql.checkRows(14)
|
||||
tdSql.checkData(0, 1, 1)
|
||||
tdSql.checkData(1, 1, 3)
|
||||
tdSql.checkData(4, 1, 4)
|
||||
|
@ -180,7 +180,7 @@ class TDTestCase:
|
|||
tdSql.checkData(6, 8, None)
|
||||
|
||||
tdSql.query("select * from ct1")
|
||||
tdSql.checkRows(4)
|
||||
tdSql.checkRows(7)
|
||||
|
||||
tdSql.query("select * from ct2")
|
||||
tdSql.checkRows(0)
|
||||
|
|
|
@ -133,7 +133,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
|
|||
|
||||
pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
|
||||
printf("failed to create child table ct0, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
@ -176,7 +176,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
|
|||
pRes = taos_query(
|
||||
pConn,
|
||||
"insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, "
|
||||
"'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')");
|
||||
"'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(1626006833703, 23, 32, 's21ds')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -190,6 +190,41 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into ct1 values(1736006813600, -32222, 43, 'ewb', 99)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table st1 drop column c4");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into ct1 values(1736006833600, -4223, 344, 'bfs')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into ct1 values(1766006833600, -4432, 4433, 'e23wb', 9349)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
|
@ -597,6 +632,7 @@ tmq_t* build_consumer() {
|
|||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "msg.consume.excluded", "1");
|
||||
// tmq_conf_set(conf, "session.timeout.ms", "1000000");
|
||||
// tmq_conf_set(conf, "max.poll.interval.ms", "20000");
|
||||
|
||||
if (g_conf.snapShot) {
|
||||
|
@ -637,6 +673,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
|||
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000);
|
||||
if (tmqmessage) {
|
||||
cnt++;
|
||||
printf("cnt:%d\n", cnt);
|
||||
msg_process(tmqmessage);
|
||||
taos_free_result(tmqmessage);
|
||||
} else {
|
||||
|
@ -845,6 +882,8 @@ void initLogFile() {
|
|||
"{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\","
|
||||
"\"colType\":5}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":6,\"colName\":\"c4\"}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c3\","
|
||||
"\"colType\":8,\"colLength\":64}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t2\","
|
||||
|
@ -992,6 +1031,8 @@ void initLogFile() {
|
|||
"{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\","
|
||||
"\"colType\":5}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":6,\"colName\":\"c4\"}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c3\","
|
||||
"\"colType\":8,\"colLength\":64}",
|
||||
"{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t2\","
|
||||
|
|
Loading…
Reference in New Issue