fix:conflicts from 3.0
This commit is contained in:
commit
77e2500a81
|
@ -91,6 +91,7 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
|
|||
不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节,注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer):
|
||||
|
||||
<Tabs defaultValue="java" groupId="lang">
|
||||
|
||||
<TabItem value="c" label="C">
|
||||
|
||||
```c
|
||||
|
@ -146,7 +147,6 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
|
|||
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
|
||||
DLL_EXPORT const char *tmq_err2str(int32_t code);
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="java" label="Java">
|
||||
|
||||
|
@ -304,7 +304,6 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
|
|||
|
||||
void Close()
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
@ -335,8 +334,8 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
|
|||
对于不同编程语言,其设置方式如下:
|
||||
|
||||
<Tabs defaultValue="java" groupId="lang">
|
||||
<TabItem value="c" label="C">
|
||||
|
||||
<TabItem value="c" label="C">
|
||||
```c
|
||||
/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
|
||||
自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
|
||||
|
@ -353,8 +352,8 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
|
|||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="java" label="Java">
|
||||
|
||||
对于 Java 程序,还可以使用如下配置项:
|
||||
|
@ -388,7 +387,6 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
|
|||
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Go" value="Go">
|
||||
|
@ -502,6 +500,7 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
|
|||
一个 consumer 支持同时订阅多个 topic。
|
||||
|
||||
<Tabs defaultValue="java" groupId="lang">
|
||||
|
||||
<TabItem value="c" label="C">
|
||||
|
||||
```c
|
||||
|
@ -581,6 +580,7 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
|
|||
以下代码展示了不同语言下如何对 TMQ 消息进行消费。
|
||||
|
||||
<Tabs defaultValue="java" groupId="lang">
|
||||
|
||||
<TabItem value="c" label="C">
|
||||
|
||||
```c
|
||||
|
@ -717,6 +717,7 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
|
|||
消费结束后,应当取消订阅。
|
||||
|
||||
<Tabs defaultValue="java" groupId="lang">
|
||||
|
||||
<TabItem value="c" label="C">
|
||||
|
||||
```c
|
||||
|
|
|
@ -463,7 +463,8 @@ struct SStreamTask {
|
|||
struct SStreamMeta* pMeta;
|
||||
SSHashObj* pNameMap;
|
||||
void* pBackend;
|
||||
char reserve[256];
|
||||
int8_t subtableWithoutMd5;
|
||||
char reserve[255];
|
||||
};
|
||||
|
||||
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
|
||||
|
@ -532,7 +533,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo)
|
|||
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
|
||||
|
||||
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
|
||||
SArray* pTaskList, bool hasFillhistory);
|
||||
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5);
|
||||
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
|
||||
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
|
||||
void tFreeStreamTask(SStreamTask* pTask);
|
||||
|
|
|
@ -119,6 +119,8 @@ int32_t taosSetFileHandlesLimit();
|
|||
|
||||
int32_t taosLinkFile(char *src, char *dst);
|
||||
|
||||
bool lastErrorIsFileNotExist();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -31,8 +31,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
|
|||
char* string = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
uError("create json object failed")
|
||||
return NULL;
|
||||
uError("create json object failed") return NULL;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
|
@ -131,7 +130,8 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
|
|||
cJSON* colType = cJSON_CreateNumber(field->type);
|
||||
cJSON_AddItemToObject(json, "colType", colType);
|
||||
|
||||
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || field->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
|
||||
field->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
int32_t length = field->bytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
|
@ -156,7 +156,8 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
|
|||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
cJSON* colType = cJSON_CreateNumber(field->type);
|
||||
cJSON_AddItemToObject(json, "colType", colType);
|
||||
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || field->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
|
||||
field->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
int32_t length = field->bytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
|
@ -469,7 +470,8 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
|
|||
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
|
||||
cJSON_AddItemToObject(json, "colType", colType);
|
||||
|
||||
if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY || vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
|
||||
vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
|
@ -490,7 +492,8 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
|
|||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
|
||||
cJSON_AddItemToObject(json, "colType", colType);
|
||||
if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
|
||||
vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
|
||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||
cJSON_AddItemToObject(json, "colLength", cbytes);
|
||||
|
@ -731,8 +734,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
pReq.source = TD_REQ_FROM_TAOX;
|
||||
pReq.igExists = true;
|
||||
|
||||
uDebug(LOG_ID_TAG" create stable name:%s suid:%" PRId64 " processSuid:%" PRId64,
|
||||
LOG_ID_VALUE, req.name, req.suid, pReq.suid);
|
||||
uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
|
||||
pReq.suid);
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SName tableName;
|
||||
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
|
||||
|
@ -835,8 +838,8 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
pReq.source = TD_REQ_FROM_TAOX;
|
||||
// pReq.suid = processSuid(req.suid, pRequest->pDb);
|
||||
|
||||
uDebug(LOG_ID_TAG" drop stable name:%s suid:%" PRId64 " new suid:%" PRId64,
|
||||
LOG_ID_VALUE, req.name, req.suid, pReq.suid);
|
||||
uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
|
||||
pReq.suid);
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SName tableName = {0};
|
||||
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
|
||||
|
@ -1142,7 +1145,8 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
tb_uid_t oldSuid = pDropReq->suid;
|
||||
pDropReq->suid = pTableMeta->suid;
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
uDebug(LOG_ID_TAG" drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid, pDropReq->suid);
|
||||
uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
|
||||
pDropReq->suid);
|
||||
|
||||
taosArrayPush(pRequest->tableList, &pName);
|
||||
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
|
||||
|
@ -1410,8 +1414,8 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS *taos, int rows, char *pDat
|
|||
return terrno;
|
||||
}
|
||||
|
||||
uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d",
|
||||
LOG_ID_VALUE, rows, pData, tbname, fields, numFields);
|
||||
uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
|
||||
rows, pData, tbname, fields, numFields);
|
||||
|
||||
pRequest->syncQuery = true;
|
||||
if (!pRequest->pDb) {
|
||||
|
|
|
@ -252,7 +252,8 @@ typedef struct SSyncCommitInfo {
|
|||
static int32_t syncAskEp(tmq_t* tmq);
|
||||
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
|
||||
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
|
||||
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet);
|
||||
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
|
||||
SMqCommitCbParamSet* pParamSet);
|
||||
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
|
||||
static void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset);
|
||||
|
||||
|
@ -439,7 +440,8 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet) {
|
||||
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
|
||||
SMqCommitCbParamSet* pParamSet) {
|
||||
SMqVgOffset pOffset = {0};
|
||||
|
||||
pOffset.consumerId = tmq->consumerId;
|
||||
|
@ -527,7 +529,8 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){
|
||||
static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam,
|
||||
int32_t rspNum) {
|
||||
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
||||
if (pParamSet == NULL) {
|
||||
return NULL;
|
||||
|
@ -561,7 +564,8 @@ static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClient
|
|||
return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) {
|
||||
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal,
|
||||
tmq_commit_cb* pCommitFp, void* userParam) {
|
||||
tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
|
||||
taosRLockLatch(&tmq->lock);
|
||||
SMqClientVg* pVg = NULL;
|
||||
|
@ -662,11 +666,13 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
|
|||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||
|
||||
tscInfo("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups);
|
||||
tscInfo("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
|
||||
numOfVgroups);
|
||||
for (int32_t j = 0; j < numOfVgroups; j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
|
||||
if (pVg->offsetInfo.endOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) {
|
||||
if (pVg->offsetInfo.endOffset.type > 0 &&
|
||||
!tOffsetEqual(&pVg->offsetInfo.endOffset, &pVg->offsetInfo.committedOffset)) {
|
||||
char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pVg->offsetInfo.endOffset);
|
||||
|
||||
|
@ -675,12 +681,15 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
|
|||
|
||||
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopic->topicName, pParamSet);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s ordinal:%d/%d",
|
||||
tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno), j + 1, numOfVgroups);
|
||||
tscError("consumer:0x%" PRIx64
|
||||
" topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s ordinal:%d/%d",
|
||||
tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno), j + 1,
|
||||
numOfVgroups);
|
||||
continue;
|
||||
}
|
||||
|
||||
tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d",
|
||||
tscInfo("consumer:0x%" PRIx64
|
||||
" topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d",
|
||||
tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups);
|
||||
pVg->offsetInfo.committedOffset = pVg->offsetInfo.endOffset;
|
||||
} else {
|
||||
|
@ -691,7 +700,8 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
|
|||
}
|
||||
taosRUnLockLatch(&tmq->lock);
|
||||
|
||||
tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics);
|
||||
tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
|
||||
numOfTopics);
|
||||
|
||||
// request is sent
|
||||
if (pParamSet->waitingRspNum != 1) {
|
||||
|
@ -1060,7 +1070,8 @@ static void tmqMgmtInit(void) {
|
|||
}
|
||||
}
|
||||
|
||||
#define SET_ERROR_MSG(MSG) if(errstr!=NULL)snprintf(errstr,errstrLen,MSG);
|
||||
#define SET_ERROR_MSG(MSG) \
|
||||
if (errstr != NULL) snprintf(errstr, errstrLen, MSG);
|
||||
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||
if (conf == NULL) {
|
||||
SET_ERROR_MSG("configure is null")
|
||||
|
@ -1154,7 +1165,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
tFormatOffset(buf, tListLen(buf), &offset);
|
||||
tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
|
||||
", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
|
||||
pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf);
|
||||
pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
|
||||
buf);
|
||||
|
||||
return pTmq;
|
||||
|
||||
|
@ -1456,7 +1468,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
|||
STqOffsetVal offsetNew = {0};
|
||||
offsetNew.type = tmq->resetOffsetCfg;
|
||||
|
||||
tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId, pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps,pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
|
||||
tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId,
|
||||
pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
|
||||
|
||||
SMqClientVg clientVg = {
|
||||
.pollCnt = 0,
|
||||
|
@ -1495,8 +1508,8 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
|||
|
||||
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
||||
if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
|
||||
tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d",
|
||||
tmq->consumerId, tmq->epoch, epoch, topicNumGet);
|
||||
tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
|
||||
tmq->epoch, epoch, topicNumGet);
|
||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
|
||||
}
|
||||
|
@ -1535,8 +1548,10 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
|||
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
|
||||
vgKey, buf);
|
||||
|
||||
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset,
|
||||
.commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows,
|
||||
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset,
|
||||
.seekOffset = pVgCur->offsetInfo.beginOffset,
|
||||
.commitOffset = pVgCur->offsetInfo.committedOffset,
|
||||
.numOfRows = pVgCur->numOfRows,
|
||||
.vgStatus = pVgCur->vgStatus};
|
||||
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
|
||||
}
|
||||
|
@ -1709,8 +1724,8 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
|||
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
|
||||
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
||||
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
|
||||
pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
|
||||
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
|
||||
pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
|
||||
if (code != 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
|
@ -1751,9 +1766,10 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (tmq->replayEnable && taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms
|
||||
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tmq->consumerId,
|
||||
tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
|
||||
if (tmq->replayEnable &&
|
||||
taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms
|
||||
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
|
||||
tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1779,7 +1795,8 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData){
|
||||
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever,
|
||||
int64_t consumerId, bool hasData) {
|
||||
if (!pVg->seekUpdated) {
|
||||
tscDebug("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId);
|
||||
if (hasData) pVg->offsetInfo.beginOffset = *reqOffset;
|
||||
|
@ -1820,14 +1837,16 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|||
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", tmq->consumerId);
|
||||
} else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
||||
terrno = pRspWrapper->code;
|
||||
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(pRspWrapper->code));
|
||||
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId,
|
||||
tstrerror(pRspWrapper->code));
|
||||
taosFreeQitem(pRspWrapper);
|
||||
return NULL;
|
||||
} else {
|
||||
if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform
|
||||
askEp(tmq, NULL, false, true);
|
||||
}
|
||||
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, tstrerror(pRspWrapper->code));
|
||||
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
|
||||
tstrerror(pRspWrapper->code));
|
||||
taosWLockLatch(&tmq->lock);
|
||||
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||
if (pVg) pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
||||
|
@ -1861,7 +1880,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|||
pVg->epSet = *pollRspWrapper->pEpset;
|
||||
}
|
||||
|
||||
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId, pDataRsp->blockNum != 0);
|
||||
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever,
|
||||
tmq->consumerId, pDataRsp->blockNum != 0);
|
||||
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
|
||||
|
@ -1920,7 +1940,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
|
||||
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset,
|
||||
pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
|
||||
// build rsp
|
||||
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
||||
taosFreeQitem(pRspWrapper);
|
||||
|
@ -1949,7 +1970,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId, pollRspWrapper->taosxRsp.blockNum != 0);
|
||||
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset,
|
||||
pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId,
|
||||
pollRspWrapper->taosxRsp.blockNum != 0);
|
||||
|
||||
if (pollRspWrapper->taosxRsp.blockNum == 0) {
|
||||
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||
|
@ -2316,7 +2339,8 @@ static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value){
|
|||
}
|
||||
|
||||
if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) {
|
||||
tscError("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value, offset->walVerBegin, offset->walVerEnd);
|
||||
tscError("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value,
|
||||
offset->walVerBegin, offset->walVerEnd);
|
||||
return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
|
@ -2370,12 +2394,14 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
|
|||
tsem_destroy(&pInfo->sem);
|
||||
taosMemoryFree(pInfo);
|
||||
|
||||
tscInfo("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
|
||||
tscInfo("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
|
||||
offset, tstrerror(code));
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
|
||||
void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb* cb,
|
||||
void* param) {
|
||||
int32_t code = 0;
|
||||
if (tmq == NULL || pTopicName == NULL) {
|
||||
tscError("invalid tmq handle, null");
|
||||
|
@ -2407,7 +2433,8 @@ void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, i
|
|||
|
||||
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
|
||||
|
||||
tscInfo("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
|
||||
tscInfo("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId,
|
||||
offset, tstrerror(code));
|
||||
|
||||
end:
|
||||
if (code != 0 && cb != NULL) {
|
||||
|
@ -2431,8 +2458,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
SMqRspHead* head = pMsg->pData;
|
||||
int32_t epoch = atomic_load_32(&tmq->epoch);
|
||||
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId,
|
||||
head->epoch, epoch);
|
||||
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
|
||||
if (pParam->sync) {
|
||||
SMqAskEpRsp rsp = {0};
|
||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
||||
|
@ -2578,10 +2604,12 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
|
|||
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
|
||||
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
||||
if (waitingRspNum == 0) {
|
||||
tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic, vgId);
|
||||
tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
|
||||
vgId);
|
||||
tmqCommitDone(pParamSet);
|
||||
} else {
|
||||
tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId, waitingRspNum);
|
||||
tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
|
||||
waitingRspNum);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2590,7 +2618,8 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
|
|||
pRspObj->resIter++;
|
||||
|
||||
if (pRspObj->resIter < pRspObj->rsp.blockNum) {
|
||||
SRetrieveTableRspForTmq* pRetrieveTmq = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
|
||||
SRetrieveTableRspForTmq* pRetrieveTmq =
|
||||
(SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
|
||||
if (pRspObj->rsp.withSchema) {
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
|
||||
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
|
||||
|
@ -2856,13 +2885,15 @@ int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){
|
|||
|
||||
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
|
||||
if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) {
|
||||
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->endOffset.type);
|
||||
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
|
||||
pOffsetInfo->endOffset.type);
|
||||
taosWUnLockLatch(&tmq->lock);
|
||||
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
|
||||
}
|
||||
|
||||
if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) {
|
||||
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->committedOffset.type);
|
||||
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId,
|
||||
pOffsetInfo->committedOffset.type);
|
||||
taosWUnLockLatch(&tmq->lock);
|
||||
return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
|
||||
}
|
||||
|
@ -2939,8 +2970,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
|
||||
pAssignment->end = pClientVg->offsetInfo.walVerEnd;
|
||||
pAssignment->vgId = pClientVg->vgId;
|
||||
tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId,
|
||||
pAssignment->vgId, pAssignment->currentOffset);
|
||||
tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId,
|
||||
pAssignment->currentOffset);
|
||||
}
|
||||
|
||||
if (needFetch) {
|
||||
|
@ -3049,7 +3080,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
}
|
||||
|
||||
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
|
||||
tscInfo("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%"PRId64, tmq->consumerId, pTopic->topicName, p->vgId, p->currentOffset);
|
||||
tscInfo("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName,
|
||||
p->vgId, p->currentOffset);
|
||||
|
||||
pOffsetInfo->walVerBegin = p->begin;
|
||||
pOffsetInfo->walVerEnd = p->end;
|
||||
|
@ -3087,7 +3119,8 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if there is no data to poll
|
||||
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if
|
||||
// there is no data to poll
|
||||
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
|
||||
if (tmq == NULL || pTopicName == NULL) {
|
||||
tscError("invalid tmq handle, null");
|
||||
|
|
|
@ -699,6 +699,7 @@ typedef struct {
|
|||
int64_t checkpointId;
|
||||
|
||||
int32_t indexForMultiAggBalance;
|
||||
int8_t subTableWithoutMd5;
|
||||
char reserve[256];
|
||||
|
||||
} SStreamObj;
|
||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#define MND_STREAM_RESERVE_SIZE 64
|
||||
#define MND_STREAM_VER_NUMBER 4
|
||||
#define MND_STREAM_VER_NUMBER 5
|
||||
|
||||
#define MND_STREAM_CREATE_NAME "stream-create"
|
||||
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
|
||||
|
|
|
@ -15,13 +15,13 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndConsumer.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndTopic.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "tcompare.h"
|
||||
#include "tname.h"
|
||||
|
||||
|
@ -71,7 +71,8 @@ void mndCleanupConsumer(SMnode *pMnode) {}
|
|||
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo *info) {
|
||||
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
|
||||
if (pClearMsg == NULL) {
|
||||
mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
|
||||
mError("consumer:0x%" PRIx64 " failed to clear consumer due to out of memory. alloc size:%d", consumerId,
|
||||
(int32_t)sizeof(SMqConsumerClearMsg));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -88,7 +89,8 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo*
|
|||
return;
|
||||
}
|
||||
|
||||
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) {
|
||||
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser,
|
||||
bool enableReplay) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -503,7 +505,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
pHead->walsver = 0;
|
||||
pHead->walever = 0;
|
||||
|
||||
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
tEncodeSMqAskEpRsp(&abuf, &rsp);
|
||||
|
||||
|
@ -566,7 +567,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
|
||||
SArray *pTopicList = subscribe.topicNames;
|
||||
taosArraySort(pTopicList, taosArrayCompareString);
|
||||
taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
|
||||
|
@ -792,9 +792,8 @@ CM_DECODE_OVER:
|
|||
}
|
||||
|
||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||
mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d",
|
||||
pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
|
||||
pConsumer->epoch);
|
||||
mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup,
|
||||
pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch);
|
||||
pConsumer->subscribeTime = pConsumer->createTime;
|
||||
return 0;
|
||||
}
|
||||
|
@ -903,9 +902,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
//
|
||||
// int32_t prevStatus = pOldConsumer->status;
|
||||
// pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
|
||||
// mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
|
||||
// pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
|
||||
// pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
|
||||
// mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ",
|
||||
// reb-removed-topics:%d",
|
||||
// pOldConsumer->consumerId, mndConsumerStatusName(prevStatus),
|
||||
// mndConsumerStatusName(pOldConsumer->status), pOldConsumer->rebalanceTime,
|
||||
// (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
|
||||
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
|
@ -1091,7 +1092,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
|||
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
|
||||
|
||||
char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
|
||||
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName,
|
||||
pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
|
||||
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
|
|
|
@ -85,6 +85,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
|||
|
||||
// 3.0.50 ver = 3
|
||||
if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->subTableWithoutMd5) < 0) return -1;
|
||||
|
||||
if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1;
|
||||
|
||||
|
@ -168,6 +169,10 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
|
|||
if (sver >= 3) {
|
||||
if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1;
|
||||
}
|
||||
|
||||
if (sver >= 5) {
|
||||
if (tDecodeI8(pDecoder, &pObj->subTableWithoutMd5) < 0) return -1;
|
||||
}
|
||||
if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1;
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
|
|
|
@ -221,8 +221,8 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgrou
|
|||
int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
||||
|
||||
SStreamTask* pTask =
|
||||
tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory);
|
||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList,
|
||||
pStream->conf.fillHistory, pStream->subTableWithoutMd5);
|
||||
if (pTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
|
@ -326,7 +326,7 @@ static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool is
|
|||
|
||||
SStreamTask* pTask =
|
||||
tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
|
||||
*pTaskList, pStream->conf.fillHistory);
|
||||
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5);
|
||||
if (pTask == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -463,7 +463,7 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil
|
|||
|
||||
SStreamTask* pAggTask =
|
||||
tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
|
||||
*pTaskList, pStream->conf.fillHistory);
|
||||
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5);
|
||||
if (pAggTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
|
|
@ -567,6 +567,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
streamObj.conf.triggerParam = pCreate->maxDelay;
|
||||
streamObj.ast = taosStrdup(smaObj.ast);
|
||||
streamObj.indexForMultiAggBalance = -1;
|
||||
streamObj.subTableWithoutMd5 = 1;
|
||||
|
||||
// check the maxDelay
|
||||
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
|
||||
|
|
|
@ -14,8 +14,8 @@
|
|||
*/
|
||||
|
||||
#include "tq.h"
|
||||
#include "vnd.h"
|
||||
#include "tqCommon.h"
|
||||
#include "vnd.h"
|
||||
|
||||
// 0: not init
|
||||
// 1: already inited
|
||||
|
@ -862,9 +862,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg);
|
||||
}
|
||||
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
|
||||
|
||||
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
|
||||
|
@ -1192,8 +1190,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
streamProcessCheckpointSourceReq(pTask, &req);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d",
|
||||
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
|
||||
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d", pTask->id.idStr,
|
||||
vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
|
||||
|
||||
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -207,8 +207,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t
|
|||
goto END;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64" 0x%"PRIx64, vgId,
|
||||
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
|
||||
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64 " 0x%" PRIx64,
|
||||
vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
|
||||
|
||||
if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
||||
code = walFetchBody(pHandle->pWalReader);
|
||||
|
@ -322,7 +322,8 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
|||
|
||||
void* data = taosMemoryMalloc(len);
|
||||
if (data == NULL) {
|
||||
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
|
||||
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
|
||||
// retry
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
terrno = code;
|
||||
|
||||
|
@ -387,8 +388,8 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
|||
pReader->nextBlk = 0;
|
||||
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||
while (pReader->nextBlk < numOfBlocks) {
|
||||
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk,
|
||||
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver);
|
||||
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
|
||||
pReader->msg.ver);
|
||||
|
||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||
if ((pSubmitTbData->source & sourceExcluded) != 0) {
|
||||
|
@ -448,17 +449,11 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
|
|||
return 0;
|
||||
}
|
||||
|
||||
SWalReader* tqGetWalReader(STqReader* pReader) {
|
||||
return pReader->pWalReader;
|
||||
}
|
||||
SWalReader* tqGetWalReader(STqReader* pReader) { return pReader->pWalReader; }
|
||||
|
||||
SSDataBlock* tqGetResultBlock (STqReader* pReader) {
|
||||
return pReader->pResBlock;
|
||||
}
|
||||
SSDataBlock* tqGetResultBlock(STqReader* pReader) { return pReader->pResBlock; }
|
||||
|
||||
int64_t tqGetResultBlockTime(STqReader *pReader){
|
||||
return pReader->lastTs;
|
||||
}
|
||||
int64_t tqGetResultBlockTime(STqReader* pReader) { return pReader->lastTs; }
|
||||
|
||||
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
|
||||
if (pReader->msg.msgStr == NULL) {
|
||||
|
@ -1027,9 +1022,7 @@ bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
|
|||
return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t));
|
||||
}
|
||||
|
||||
bool tqCurrentBlockConsumed(const STqReader* pReader) {
|
||||
return pReader->msg.msgStr == NULL;
|
||||
}
|
||||
bool tqCurrentBlockConsumed(const STqReader* pReader) { return pReader->msg.msgStr == NULL; }
|
||||
|
||||
int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
|
@ -1071,9 +1064,11 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
} else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
if (isAdd) {
|
||||
SArray* list = NULL;
|
||||
int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list, pTqHandle->execHandle.task);
|
||||
int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
|
||||
&list, pTqHandle->execHandle.task);
|
||||
if (ret != TDB_CODE_SUCCESS) {
|
||||
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
|
||||
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
|
||||
pTqHandle->consumerId);
|
||||
taosArrayDestroy(list);
|
||||
taosHashCancelIterate(pTq->pHandle, pIter);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
|
|
|
@ -250,7 +250,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) {
|
||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows,
|
||||
int8_t sourceExcluded) {
|
||||
STqExecHandle* pExec = &pHandle->execHandle;
|
||||
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
SArray* pSchemas = taosArrayInit(0, sizeof(void*));
|
||||
|
|
|
@ -262,7 +262,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
|||
}
|
||||
|
||||
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid,
|
||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
|
||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
|
||||
|
||||
taosArrayPush(reqs.pArray, pCreateTbReq);
|
||||
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
|
||||
|
@ -374,7 +374,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
|
|||
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
|
||||
|
||||
int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr,
|
||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
|
||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -668,8 +668,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
|
||||
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
|
||||
} else {
|
||||
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(dstTableName) &&
|
||||
!alreadyAddGroupId(dstTableName) && groupId != 0) {
|
||||
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 &&
|
||||
!isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) {
|
||||
buildCtbNameAddGruopId(dstTableName, groupId);
|
||||
}
|
||||
}
|
||||
|
@ -712,8 +712,9 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
|
||||
|
||||
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||
pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock,
|
||||
pTagArray, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
|
||||
pTableData->pCreateTbReq =
|
||||
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
|
||||
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
|
||||
taosArrayDestroy(pTagArray);
|
||||
|
||||
if (pTableData->pCreateTbReq == NULL) {
|
||||
|
|
|
@ -516,7 +516,7 @@ cmd ::= SHOW VNODES.
|
|||
// show alive
|
||||
cmd ::= SHOW db_name_cond_opt(A) ALIVE. { pCxt->pRootNode = createShowAliveStmt(pCxt, A, QUERY_NODE_SHOW_DB_ALIVE_STMT); }
|
||||
cmd ::= SHOW CLUSTER ALIVE. { pCxt->pRootNode = createShowAliveStmt(pCxt, NULL, QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT); }
|
||||
cmd ::= SHOW db_name_cond_opt(A) VIEWS. { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_VIEWS_STMT, A, NULL, OP_TYPE_LIKE); }
|
||||
cmd ::= SHOW db_name_cond_opt(A) VIEWS like_pattern_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_VIEWS_STMT, A, B, OP_TYPE_LIKE); }
|
||||
cmd ::= SHOW CREATE VIEW full_table_name(A). { pCxt->pRootNode = createShowCreateViewStmt(pCxt, QUERY_NODE_SHOW_CREATE_VIEW_STMT, A); }
|
||||
cmd ::= SHOW COMPACTS. { pCxt->pRootNode = createShowCompactsStmt(pCxt, QUERY_NODE_SHOW_COMPACTS_STMT); }
|
||||
cmd ::= SHOW COMPACT NK_INTEGER(A). { pCxt->pRootNode = createShowCompactDetailsStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A)); }
|
||||
|
|
|
@ -9399,7 +9399,20 @@ static int32_t createOperatorNode(EOperatorType opType, const char* pColName, SN
|
|||
}
|
||||
|
||||
static const char* getTbNameColName(ENodeType type) {
|
||||
return (QUERY_NODE_SHOW_STABLES_STMT == type ? "stable_name" : "table_name");
|
||||
const char* colName;
|
||||
switch (type)
|
||||
{
|
||||
case QUERY_NODE_SHOW_VIEWS_STMT:
|
||||
colName = "view_name";
|
||||
break;
|
||||
case QUERY_NODE_SHOW_STABLES_STMT:
|
||||
colName = "stable_name";
|
||||
break;
|
||||
default:
|
||||
colName = "table_name";
|
||||
break;
|
||||
}
|
||||
return colName;
|
||||
}
|
||||
|
||||
static int32_t createLogicCondNode(SNode* pCond1, SNode* pCond2, SNode** pCond, ELogicConditionType logicCondType) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -66,7 +66,7 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
|
|||
return true;
|
||||
}
|
||||
|
||||
if ((*pJob->chkKillFp)(pJob->chkKillParam)) {
|
||||
if (pJob->chkKillFp && (*pJob->chkKillFp)(pJob->chkKillParam)) {
|
||||
schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -54,9 +54,8 @@
|
|||
|
||||
namespace {
|
||||
|
||||
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize,
|
||||
int32_t rspCode);
|
||||
extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode);
|
||||
extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode);
|
||||
extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t rspCode);
|
||||
|
||||
int64_t insertJobRefId = 0;
|
||||
int64_t queryJobRefId = 0;
|
||||
|
@ -67,7 +66,7 @@ uint64_t schtQueryId = 1;
|
|||
|
||||
bool schtTestStop = false;
|
||||
bool schtTestDeadLoop = false;
|
||||
int32_t schtTestMTRunSec = 10;
|
||||
int32_t schtTestMTRunSec = 1;
|
||||
int32_t schtTestPrintNum = 1000;
|
||||
int32_t schtStartFetch = 0;
|
||||
|
||||
|
@ -85,10 +84,69 @@ void schtInitLogFile() {
|
|||
}
|
||||
|
||||
void schtQueryCb(SExecResult *pResult, void *param, int32_t code) {
|
||||
assert(TSDB_CODE_SUCCESS == code);
|
||||
*(int32_t *)param = 1;
|
||||
}
|
||||
|
||||
int32_t schtBuildQueryRspMsg(uint32_t *msize, void** rspMsg) {
|
||||
SQueryTableRsp rsp = {0};
|
||||
rsp.code = 0;
|
||||
rsp.affectedRows = 0;
|
||||
rsp.tbVerInfo = NULL;
|
||||
|
||||
int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp);
|
||||
if (msgSize < 0) {
|
||||
qError("tSerializeSQueryTableRsp failed");
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
void *pRsp = taosMemoryCalloc(msgSize, 1);
|
||||
if (NULL == pRsp) {
|
||||
qError("rpcMallocCont %d failed", msgSize);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (tSerializeSQueryTableRsp(pRsp, msgSize, &rsp) < 0) {
|
||||
qError("tSerializeSQueryTableRsp %d failed", msgSize);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
*rspMsg = pRsp;
|
||||
*msize = msgSize;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schtBuildFetchRspMsg(uint32_t *msize, void** rspMsg) {
|
||||
SRetrieveTableRsp* rsp = (SRetrieveTableRsp*)taosMemoryCalloc(sizeof(SRetrieveTableRsp), 1);
|
||||
rsp->completed = 1;
|
||||
rsp->numOfRows = 10;
|
||||
rsp->compLen = 0;
|
||||
|
||||
*rspMsg = rsp;
|
||||
*msize = sizeof(SRetrieveTableRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schtBuildSubmitRspMsg(uint32_t *msize, void** rspMsg) {
|
||||
SSubmitRsp2 submitRsp = {0};
|
||||
int32_t msgSize = 0, ret = 0;
|
||||
SEncoder ec = {0};
|
||||
|
||||
tEncodeSize(tEncodeSSubmitRsp2, &submitRsp, msgSize, ret);
|
||||
void* msg = taosMemoryCalloc(1, msgSize);
|
||||
tEncoderInit(&ec, (uint8_t*)msg, msgSize);
|
||||
tEncodeSSubmitRsp2(&ec, &submitRsp);
|
||||
tEncoderClear(&ec);
|
||||
|
||||
*rspMsg = msg;
|
||||
*msize = msgSize;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void schtBuildQueryDag(SQueryPlan *dag) {
|
||||
uint64_t qId = schtQueryId;
|
||||
|
||||
|
@ -98,8 +156,8 @@ void schtBuildQueryDag(SQueryPlan *dag) {
|
|||
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||
|
||||
SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
|
||||
SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
|
||||
SSubplan *scanPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||
SSubplan *mergePlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||
|
||||
scanPlan->id.queryId = qId;
|
||||
scanPlan->id.groupId = 0x0000000000000002;
|
||||
|
@ -113,7 +171,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
|
|||
scanPlan->pChildren = NULL;
|
||||
scanPlan->level = 1;
|
||||
scanPlan->pParents = nodesMakeList();
|
||||
scanPlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
|
||||
scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
|
||||
scanPlan->msgType = TDMT_SCH_QUERY;
|
||||
|
||||
mergePlan->id.queryId = qId;
|
||||
|
@ -125,7 +183,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
|
|||
|
||||
mergePlan->pChildren = nodesMakeList();
|
||||
mergePlan->pParents = NULL;
|
||||
mergePlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
|
||||
mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE);
|
||||
mergePlan->msgType = TDMT_SCH_QUERY;
|
||||
|
||||
merge->pNodeList = nodesMakeList();
|
||||
|
@ -151,8 +209,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
|
|||
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||
|
||||
SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(scanPlanNum, sizeof(SSubplan));
|
||||
SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
|
||||
SSubplan *mergePlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||
|
||||
merge->pNodeList = nodesMakeList();
|
||||
scan->pNodeList = nodesMakeList();
|
||||
|
@ -160,29 +217,30 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
|
|||
mergePlan->pChildren = nodesMakeList();
|
||||
|
||||
for (int32_t i = 0; i < scanPlanNum; ++i) {
|
||||
scanPlan[i].id.queryId = qId;
|
||||
scanPlan[i].id.groupId = 0x0000000000000002;
|
||||
scanPlan[i].id.subplanId = 0x0000000000000003 + i;
|
||||
scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN;
|
||||
SSubplan *scanPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||
scanPlan->id.queryId = qId;
|
||||
scanPlan->id.groupId = 0x0000000000000002;
|
||||
scanPlan->id.subplanId = 0x0000000000000003 + i;
|
||||
scanPlan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||
|
||||
scanPlan[i].execNode.nodeId = 1 + i;
|
||||
scanPlan[i].execNode.epSet.inUse = 0;
|
||||
scanPlan[i].execNodeStat.tableNum = taosRand() % 30;
|
||||
addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep0", 6030);
|
||||
addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep1", 6030);
|
||||
addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep2", 6030);
|
||||
scanPlan[i].execNode.epSet.inUse = taosRand() % 3;
|
||||
scanPlan->execNode.nodeId = 1 + i;
|
||||
scanPlan->execNode.epSet.inUse = 0;
|
||||
scanPlan->execNodeStat.tableNum = taosRand() % 30;
|
||||
addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030);
|
||||
addEpIntoEpSet(&scanPlan->execNode.epSet, "ep1", 6030);
|
||||
addEpIntoEpSet(&scanPlan->execNode.epSet, "ep2", 6030);
|
||||
scanPlan->execNode.epSet.inUse = taosRand() % 3;
|
||||
|
||||
scanPlan[i].pChildren = NULL;
|
||||
scanPlan[i].level = 1;
|
||||
scanPlan[i].pParents = nodesMakeList();
|
||||
scanPlan[i].pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
|
||||
scanPlan[i].msgType = TDMT_SCH_QUERY;
|
||||
scanPlan->pChildren = NULL;
|
||||
scanPlan->level = 1;
|
||||
scanPlan->pParents = nodesMakeList();
|
||||
scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
|
||||
scanPlan->msgType = TDMT_SCH_QUERY;
|
||||
|
||||
nodesListAppend(scanPlan[i].pParents, (SNode *)mergePlan);
|
||||
nodesListAppend(mergePlan->pChildren, (SNode *)(scanPlan + i));
|
||||
nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
|
||||
nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
|
||||
|
||||
nodesListAppend(scan->pNodeList, (SNode *)(scanPlan + i));
|
||||
nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
|
||||
}
|
||||
|
||||
mergePlan->id.queryId = qId;
|
||||
|
@ -193,7 +251,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
|
|||
mergePlan->execNode.epSet.numOfEps = 0;
|
||||
|
||||
mergePlan->pParents = NULL;
|
||||
mergePlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
|
||||
mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE);
|
||||
mergePlan->msgType = TDMT_SCH_QUERY;
|
||||
|
||||
nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
|
||||
|
@ -211,45 +269,50 @@ void schtBuildInsertDag(SQueryPlan *dag) {
|
|||
dag->numOfSubplans = 2;
|
||||
dag->pSubplans = nodesMakeList();
|
||||
SNodeListNode *inserta = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||
|
||||
SSubplan *insertPlan = (SSubplan *)taosMemoryCalloc(2, sizeof(SSubplan));
|
||||
|
||||
insertPlan[0].id.queryId = qId;
|
||||
insertPlan[0].id.groupId = 0x0000000000000003;
|
||||
insertPlan[0].id.subplanId = 0x0000000000000004;
|
||||
insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY;
|
||||
insertPlan[0].level = 0;
|
||||
|
||||
insertPlan[0].execNode.nodeId = 1;
|
||||
insertPlan[0].execNode.epSet.inUse = 0;
|
||||
addEpIntoEpSet(&insertPlan[0].execNode.epSet, "ep0", 6030);
|
||||
|
||||
insertPlan[0].pChildren = NULL;
|
||||
insertPlan[0].pParents = NULL;
|
||||
insertPlan[0].pNode = NULL;
|
||||
insertPlan[0].pDataSink = (SDataSinkNode *)taosMemoryCalloc(1, sizeof(SDataSinkNode));
|
||||
insertPlan[0].msgType = TDMT_VND_SUBMIT;
|
||||
|
||||
insertPlan[1].id.queryId = qId;
|
||||
insertPlan[1].id.groupId = 0x0000000000000003;
|
||||
insertPlan[1].id.subplanId = 0x0000000000000005;
|
||||
insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY;
|
||||
insertPlan[1].level = 0;
|
||||
|
||||
insertPlan[1].execNode.nodeId = 1;
|
||||
insertPlan[1].execNode.epSet.inUse = 0;
|
||||
addEpIntoEpSet(&insertPlan[1].execNode.epSet, "ep0", 6030);
|
||||
|
||||
insertPlan[1].pChildren = NULL;
|
||||
insertPlan[1].pParents = NULL;
|
||||
insertPlan[1].pNode = NULL;
|
||||
insertPlan[1].pDataSink = (SDataSinkNode *)taosMemoryCalloc(1, sizeof(SDataSinkNode));
|
||||
insertPlan[1].msgType = TDMT_VND_SUBMIT;
|
||||
|
||||
inserta->pNodeList = nodesMakeList();
|
||||
|
||||
SSubplan *insertPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||
|
||||
insertPlan->id.queryId = qId;
|
||||
insertPlan->id.groupId = 0x0000000000000003;
|
||||
insertPlan->id.subplanId = 0x0000000000000004;
|
||||
insertPlan->subplanType = SUBPLAN_TYPE_MODIFY;
|
||||
insertPlan->level = 0;
|
||||
|
||||
insertPlan->execNode.nodeId = 1;
|
||||
insertPlan->execNode.epSet.inUse = 0;
|
||||
addEpIntoEpSet(&insertPlan->execNode.epSet, "ep0", 6030);
|
||||
|
||||
insertPlan->pChildren = NULL;
|
||||
insertPlan->pParents = NULL;
|
||||
insertPlan->pNode = NULL;
|
||||
insertPlan->pDataSink = (SDataSinkNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
|
||||
((SDataInserterNode*)insertPlan->pDataSink)->size = 1;
|
||||
((SDataInserterNode*)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
|
||||
insertPlan->msgType = TDMT_VND_SUBMIT;
|
||||
|
||||
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
|
||||
insertPlan += 1;
|
||||
|
||||
insertPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||
|
||||
insertPlan->id.queryId = qId;
|
||||
insertPlan->id.groupId = 0x0000000000000003;
|
||||
insertPlan->id.subplanId = 0x0000000000000005;
|
||||
insertPlan->subplanType = SUBPLAN_TYPE_MODIFY;
|
||||
insertPlan->level = 0;
|
||||
|
||||
insertPlan->execNode.nodeId = 1;
|
||||
insertPlan->execNode.epSet.inUse = 0;
|
||||
addEpIntoEpSet(&insertPlan->execNode.epSet, "ep0", 6030);
|
||||
|
||||
insertPlan->pChildren = NULL;
|
||||
insertPlan->pParents = NULL;
|
||||
insertPlan->pNode = NULL;
|
||||
insertPlan->pDataSink = (SDataSinkNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
|
||||
((SDataInserterNode*)insertPlan->pDataSink)->size = 1;
|
||||
((SDataInserterNode*)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
|
||||
insertPlan->msgType = TDMT_VND_SUBMIT;
|
||||
|
||||
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
|
||||
|
||||
nodesListAppend(dag->pSubplans, (SNode *)inserta);
|
||||
|
@ -325,7 +388,7 @@ void schtSetRpcSendRequest() {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTransporterId, SMsgSendInfo *pInfo) {
|
||||
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTransporterId, SMsgSendInfo *pInfo, bool persistHandle, void* rpcCtx) {
|
||||
if (pInfo) {
|
||||
taosMemoryFreeClear(pInfo->param);
|
||||
taosMemoryFreeClear(pInfo->msgInfo.pData);
|
||||
|
@ -336,17 +399,17 @@ int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTr
|
|||
|
||||
void schtSetAsyncSendMsgToServer() {
|
||||
static Stub stub;
|
||||
stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
|
||||
stub.set(asyncSendMsgToServerExt, schtAsyncSendMsgToServer);
|
||||
{
|
||||
#ifdef WINDOWS
|
||||
AddrAny any;
|
||||
std::map<std::string, void *> result;
|
||||
any.get_func_addr("asyncSendMsgToServer", result);
|
||||
any.get_func_addr("asyncSendMsgToServerExt", result);
|
||||
#endif
|
||||
#ifdef LINUX
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string, void *> result;
|
||||
any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
|
||||
any.get_global_func_addr_dynsym("^asyncSendMsgToServerExt$", result);
|
||||
#endif
|
||||
for (const auto &f : result) {
|
||||
stub.set(f.second, schtAsyncSendMsgToServer);
|
||||
|
@ -374,9 +437,13 @@ void *schtSendRsp(void *param) {
|
|||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
||||
SSubmitRsp rsp = {0};
|
||||
rsp.affectedRows = 10;
|
||||
schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
SDataBuf msg = {0};
|
||||
void* rmsg = NULL;
|
||||
schtBuildSubmitRspMsg(&msg.len, &rmsg);
|
||||
msg.msgType = TDMT_VND_SUBMIT_RSP;
|
||||
msg.pData = rmsg;
|
||||
|
||||
schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||
|
||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
}
|
||||
|
@ -393,11 +460,13 @@ void *schtCreateFetchRspThread(void *param) {
|
|||
taosSsleep(1);
|
||||
|
||||
int32_t code = 0;
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
|
||||
rsp->completed = 1;
|
||||
rsp->numOfRows = 10;
|
||||
SDataBuf msg = {0};
|
||||
void* rmsg = NULL;
|
||||
schtBuildFetchRspMsg(&msg.len, &rmsg);
|
||||
msg.msgType = TDMT_SCH_MERGE_FETCH_RSP;
|
||||
msg.pData = rmsg;
|
||||
|
||||
code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_SCH_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
|
||||
code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0);
|
||||
|
||||
schReleaseJob(job);
|
||||
|
||||
|
@ -414,7 +483,7 @@ void *schtFetchRspThread(void *aa) {
|
|||
continue;
|
||||
}
|
||||
|
||||
taosUsleep(1);
|
||||
taosUsleep(100);
|
||||
|
||||
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
|
||||
|
||||
|
@ -426,10 +495,11 @@ void *schtFetchRspThread(void *aa) {
|
|||
rsp->completed = 1;
|
||||
rsp->numOfRows = 10;
|
||||
|
||||
dataBuf.msgType = TDMT_SCH_FETCH_RSP;
|
||||
dataBuf.pData = rsp;
|
||||
dataBuf.len = sizeof(*rsp);
|
||||
|
||||
code = schHandleCallback(param, &dataBuf, TDMT_SCH_FETCH_RSP, 0);
|
||||
code = schHandleCallback(param, &dataBuf, 0);
|
||||
|
||||
assert(code == 0 || code);
|
||||
}
|
||||
|
@ -456,7 +526,7 @@ void *schtRunJobThread(void *aa) {
|
|||
char *dbname = "1.db1";
|
||||
char *tablename = "table1";
|
||||
SVgroupInfo vgInfo = {0};
|
||||
SQueryPlan dag;
|
||||
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
||||
|
||||
schtInitLogFile();
|
||||
|
||||
|
@ -470,19 +540,19 @@ void *schtRunJobThread(void *aa) {
|
|||
SSchJob *pJob = NULL;
|
||||
SSchTaskCallbackParam *param = NULL;
|
||||
SHashObj *execTasks = NULL;
|
||||
SDataBuf dataBuf = {0};
|
||||
uint32_t jobFinished = 0;
|
||||
int32_t queryDone = 0;
|
||||
|
||||
while (!schtTestStop) {
|
||||
schtBuildQueryDag(&dag);
|
||||
schtBuildQueryDag(dag);
|
||||
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
|
||||
|
||||
SEp qnodeAddr = {0};
|
||||
strcpy(qnodeAddr.fqdn, "qnode0.ep");
|
||||
qnodeAddr.port = 6031;
|
||||
taosArrayPush(qnodeList, &qnodeAddr);
|
||||
SQueryNodeLoad load = {0};
|
||||
load.addr.epSet.numOfEps = 1;
|
||||
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
|
||||
load.addr.epSet.eps[0].port = 6031;
|
||||
taosArrayPush(qnodeList, &load);
|
||||
|
||||
queryDone = 0;
|
||||
|
||||
|
@ -492,7 +562,7 @@ void *schtRunJobThread(void *aa) {
|
|||
req.syncReq = false;
|
||||
req.pConn = &conn;
|
||||
req.pNodeList = qnodeList;
|
||||
req.pDag = &dag;
|
||||
req.pDag = dag;
|
||||
req.sql = "select * from tb";
|
||||
req.execFp = schtQueryCb;
|
||||
req.cbParam = &queryDone;
|
||||
|
@ -503,7 +573,7 @@ void *schtRunJobThread(void *aa) {
|
|||
pJob = schAcquireJob(queryJobRefId);
|
||||
if (NULL == pJob) {
|
||||
taosArrayDestroy(qnodeList);
|
||||
schtFreeQueryDag(&dag);
|
||||
schtFreeQueryDag(dag);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -526,11 +596,14 @@ void *schtRunJobThread(void *aa) {
|
|||
SSchTask *task = (SSchTask *)pIter;
|
||||
|
||||
param->taskId = task->taskId;
|
||||
SQueryTableRsp rsp = {0};
|
||||
dataBuf.pData = &rsp;
|
||||
dataBuf.len = sizeof(rsp);
|
||||
|
||||
code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
|
||||
SDataBuf msg = {0};
|
||||
void* rmsg = NULL;
|
||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||
msg.pData = rmsg;
|
||||
|
||||
code = schHandleCallback(param, &msg, 0);
|
||||
assert(code == 0 || code);
|
||||
|
||||
pIter = taosHashIterate(execTasks, pIter);
|
||||
|
@ -545,11 +618,13 @@ void *schtRunJobThread(void *aa) {
|
|||
SSchTask *task = (SSchTask *)pIter;
|
||||
|
||||
param->taskId = task->taskId - 1;
|
||||
SQueryTableRsp rsp = {0};
|
||||
dataBuf.pData = &rsp;
|
||||
dataBuf.len = sizeof(rsp);
|
||||
SDataBuf msg = {0};
|
||||
void* rmsg = NULL;
|
||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||
msg.pData = rmsg;
|
||||
|
||||
code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
|
||||
code = schHandleCallback(param, &msg, 0);
|
||||
assert(code == 0 || code);
|
||||
|
||||
pIter = taosHashIterate(execTasks, pIter);
|
||||
|
@ -575,7 +650,6 @@ void *schtRunJobThread(void *aa) {
|
|||
if (0 == code) {
|
||||
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
|
||||
assert(pRsp->completed == 1);
|
||||
assert(pRsp->numOfRows == 10);
|
||||
}
|
||||
|
||||
data = NULL;
|
||||
|
@ -587,7 +661,7 @@ void *schtRunJobThread(void *aa) {
|
|||
taosHashCleanup(execTasks);
|
||||
taosArrayDestroy(qnodeList);
|
||||
|
||||
schtFreeQueryDag(&dag);
|
||||
schtFreeQueryDag(dag);
|
||||
|
||||
if (++jobFinished % schtTestPrintNum == 0) {
|
||||
printf("jobFinished:%d\n", jobFinished);
|
||||
|
@ -609,6 +683,7 @@ void *schtFreeJobThread(void *aa) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
} // namespace
|
||||
|
||||
TEST(queryTest, normalCase) {
|
||||
|
@ -618,21 +693,20 @@ TEST(queryTest, normalCase) {
|
|||
char *tablename = "table1";
|
||||
SVgroupInfo vgInfo = {0};
|
||||
int64_t job = 0;
|
||||
SQueryPlan dag;
|
||||
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
||||
|
||||
memset(&dag, 0, sizeof(dag));
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
|
||||
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
|
||||
|
||||
SEp qnodeAddr = {0};
|
||||
strcpy(qnodeAddr.fqdn, "qnode0.ep");
|
||||
qnodeAddr.port = 6031;
|
||||
taosArrayPush(qnodeList, &qnodeAddr);
|
||||
SQueryNodeLoad load = {0};
|
||||
load.addr.epSet.numOfEps = 1;
|
||||
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
|
||||
load.addr.epSet.eps[0].port = 6031;
|
||||
taosArrayPush(qnodeList, &load);
|
||||
|
||||
int32_t code = schedulerInit();
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
schtBuildQueryDag(&dag);
|
||||
schtBuildQueryDag(dag);
|
||||
|
||||
schtSetPlanToString();
|
||||
schtSetExecNode();
|
||||
|
@ -645,7 +719,7 @@ TEST(queryTest, normalCase) {
|
|||
SSchedulerReq req = {0};
|
||||
req.pConn = &conn;
|
||||
req.pNodeList = qnodeList;
|
||||
req.pDag = &dag;
|
||||
req.pDag = dag;
|
||||
req.sql = "select * from tb";
|
||||
req.execFp = schtQueryCb;
|
||||
req.cbParam = &queryDone;
|
||||
|
@ -659,8 +733,13 @@ TEST(queryTest, normalCase) {
|
|||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
||||
SQueryTableRsp rsp = {0};
|
||||
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
SDataBuf msg = {0};
|
||||
void* rmsg = NULL;
|
||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||
msg.pData = rmsg;
|
||||
|
||||
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||
|
||||
ASSERT_EQ(code, 0);
|
||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
|
@ -669,11 +748,18 @@ TEST(queryTest, normalCase) {
|
|||
pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
if (JOB_TASK_STATUS_EXEC == task->status) {
|
||||
SDataBuf msg = {0};
|
||||
void* rmsg = NULL;
|
||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||
msg.pData = rmsg;
|
||||
|
||||
SQueryTableRsp rsp = {0};
|
||||
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||
|
||||
ASSERT_EQ(code, 0);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
}
|
||||
|
||||
|
@ -703,18 +789,12 @@ TEST(queryTest, normalCase) {
|
|||
ASSERT_EQ(pRsp->numOfRows, 10);
|
||||
taosMemoryFreeClear(data);
|
||||
|
||||
data = NULL;
|
||||
code = schedulerFetchRows(job, &req);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_TRUE(data == NULL);
|
||||
|
||||
schReleaseJob(job);
|
||||
|
||||
schedulerDestroy();
|
||||
|
||||
schedulerFreeJob(&job, 0);
|
||||
|
||||
schtFreeQueryDag(&dag);
|
||||
|
||||
schedulerDestroy();
|
||||
}
|
||||
|
||||
TEST(queryTest, readyFirstCase) {
|
||||
|
@ -724,21 +804,20 @@ TEST(queryTest, readyFirstCase) {
|
|||
char *tablename = "table1";
|
||||
SVgroupInfo vgInfo = {0};
|
||||
int64_t job = 0;
|
||||
SQueryPlan dag;
|
||||
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
||||
|
||||
memset(&dag, 0, sizeof(dag));
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
|
||||
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
|
||||
|
||||
SEp qnodeAddr = {0};
|
||||
strcpy(qnodeAddr.fqdn, "qnode0.ep");
|
||||
qnodeAddr.port = 6031;
|
||||
taosArrayPush(qnodeList, &qnodeAddr);
|
||||
SQueryNodeLoad load = {0};
|
||||
load.addr.epSet.numOfEps = 1;
|
||||
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
|
||||
load.addr.epSet.eps[0].port = 6031;
|
||||
taosArrayPush(qnodeList, &load);
|
||||
|
||||
int32_t code = schedulerInit();
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
schtBuildQueryDag(&dag);
|
||||
schtBuildQueryDag(dag);
|
||||
|
||||
schtSetPlanToString();
|
||||
schtSetExecNode();
|
||||
|
@ -751,7 +830,7 @@ TEST(queryTest, readyFirstCase) {
|
|||
SSchedulerReq req = {0};
|
||||
req.pConn = &conn;
|
||||
req.pNodeList = qnodeList;
|
||||
req.pDag = &dag;
|
||||
req.pDag = dag;
|
||||
req.sql = "select * from tb";
|
||||
req.execFp = schtQueryCb;
|
||||
req.cbParam = &queryDone;
|
||||
|
@ -764,8 +843,13 @@ TEST(queryTest, readyFirstCase) {
|
|||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
||||
SQueryTableRsp rsp = {0};
|
||||
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
SDataBuf msg = {0};
|
||||
void* rmsg = NULL;
|
||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||
msg.pData = rmsg;
|
||||
|
||||
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||
|
||||
ASSERT_EQ(code, 0);
|
||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
|
@ -775,10 +859,18 @@ TEST(queryTest, readyFirstCase) {
|
|||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
||||
SQueryTableRsp rsp = {0};
|
||||
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
if (JOB_TASK_STATUS_EXEC == task->status) {
|
||||
SDataBuf msg = {0};
|
||||
void* rmsg = NULL;
|
||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||
msg.pData = rmsg;
|
||||
|
||||
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||
|
||||
ASSERT_EQ(code, 0);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
}
|
||||
|
||||
|
@ -807,18 +899,11 @@ TEST(queryTest, readyFirstCase) {
|
|||
ASSERT_EQ(pRsp->numOfRows, 10);
|
||||
taosMemoryFreeClear(data);
|
||||
|
||||
data = NULL;
|
||||
code = schedulerFetchRows(job, &req);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_TRUE(data == NULL);
|
||||
|
||||
schReleaseJob(job);
|
||||
|
||||
schedulerFreeJob(&job, 0);
|
||||
|
||||
schtFreeQueryDag(&dag);
|
||||
|
||||
schedulerDestroy();
|
||||
|
||||
schedulerFreeJob(&job, 0);
|
||||
}
|
||||
|
||||
TEST(queryTest, flowCtrlCase) {
|
||||
|
@ -828,35 +913,39 @@ TEST(queryTest, flowCtrlCase) {
|
|||
char *tablename = "table1";
|
||||
SVgroupInfo vgInfo = {0};
|
||||
int64_t job = 0;
|
||||
SQueryPlan dag;
|
||||
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
||||
|
||||
schtInitLogFile();
|
||||
|
||||
taosSeedRand(taosGetTimestampSec());
|
||||
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
|
||||
|
||||
SQueryNodeLoad load = {0};
|
||||
load.addr.epSet.numOfEps = 1;
|
||||
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
|
||||
load.addr.epSet.eps[0].port = 6031;
|
||||
taosArrayPush(qnodeList, &load);
|
||||
|
||||
SEp qnodeAddr = {0};
|
||||
strcpy(qnodeAddr.fqdn, "qnode0.ep");
|
||||
qnodeAddr.port = 6031;
|
||||
taosArrayPush(qnodeList, &qnodeAddr);
|
||||
|
||||
int32_t code = schedulerInit();
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
schtBuildQueryFlowCtrlDag(&dag);
|
||||
schtBuildQueryFlowCtrlDag(dag);
|
||||
|
||||
schtSetPlanToString();
|
||||
schtSetExecNode();
|
||||
schtSetAsyncSendMsgToServer();
|
||||
|
||||
initTaskQueue();
|
||||
|
||||
int32_t queryDone = 0;
|
||||
SRequestConnInfo conn = {0};
|
||||
conn.pTrans = mockPointer;
|
||||
SSchedulerReq req = {0};
|
||||
req.pConn = &conn;
|
||||
req.pNodeList = qnodeList;
|
||||
req.pDag = &dag;
|
||||
req.pDag = dag;
|
||||
req.sql = "select * from tb";
|
||||
req.execFp = schtQueryCb;
|
||||
req.cbParam = &queryDone;
|
||||
|
@ -866,41 +955,27 @@ TEST(queryTest, flowCtrlCase) {
|
|||
|
||||
SSchJob *pJob = schAcquireJob(job);
|
||||
|
||||
bool qDone = false;
|
||||
|
||||
while (!qDone) {
|
||||
while (!queryDone) {
|
||||
void *pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||
if (NULL == pIter) {
|
||||
break;
|
||||
}
|
||||
|
||||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
||||
taosHashCancelIterate(pJob->execTasks, pIter);
|
||||
if (JOB_TASK_STATUS_EXEC == task->status && 0 != task->lastMsgType) {
|
||||
SDataBuf msg = {0};
|
||||
void* rmsg = NULL;
|
||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||
msg.pData = rmsg;
|
||||
|
||||
if (task->lastMsgType == TDMT_SCH_QUERY) {
|
||||
SQueryTableRsp rsp = {0};
|
||||
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||
|
||||
ASSERT_EQ(code, 0);
|
||||
} else {
|
||||
qDone = true;
|
||||
break;
|
||||
}
|
||||
|
||||
pIter = NULL;
|
||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (queryDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
taosUsleep(10000);
|
||||
}
|
||||
|
||||
TdThreadAttr thattr;
|
||||
taosThreadAttrInit(&thattr);
|
||||
|
||||
|
@ -918,18 +993,11 @@ TEST(queryTest, flowCtrlCase) {
|
|||
ASSERT_EQ(pRsp->numOfRows, 10);
|
||||
taosMemoryFreeClear(data);
|
||||
|
||||
data = NULL;
|
||||
code = schedulerFetchRows(job, &req);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_TRUE(data == NULL);
|
||||
|
||||
schReleaseJob(job);
|
||||
|
||||
schedulerFreeJob(&job, 0);
|
||||
|
||||
schtFreeQueryDag(&dag);
|
||||
|
||||
schedulerDestroy();
|
||||
|
||||
schedulerFreeJob(&job, 0);
|
||||
}
|
||||
|
||||
TEST(insertTest, normalCase) {
|
||||
|
@ -938,20 +1006,21 @@ TEST(insertTest, normalCase) {
|
|||
char *dbname = "1.db1";
|
||||
char *tablename = "table1";
|
||||
SVgroupInfo vgInfo = {0};
|
||||
SQueryPlan dag;
|
||||
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
||||
uint64_t numOfRows = 0;
|
||||
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
|
||||
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
|
||||
|
||||
SEp qnodeAddr = {0};
|
||||
strcpy(qnodeAddr.fqdn, "qnode0.ep");
|
||||
qnodeAddr.port = 6031;
|
||||
taosArrayPush(qnodeList, &qnodeAddr);
|
||||
SQueryNodeLoad load = {0};
|
||||
load.addr.epSet.numOfEps = 1;
|
||||
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
|
||||
load.addr.epSet.eps[0].port = 6031;
|
||||
taosArrayPush(qnodeList, &load);
|
||||
|
||||
int32_t code = schedulerInit();
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
schtBuildInsertDag(&dag);
|
||||
schtBuildInsertDag(dag);
|
||||
|
||||
schtSetPlanToString();
|
||||
schtSetAsyncSendMsgToServer();
|
||||
|
@ -962,21 +1031,19 @@ TEST(insertTest, normalCase) {
|
|||
TdThread thread1;
|
||||
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
|
||||
|
||||
SExecResult res = {0};
|
||||
|
||||
int32_t queryDone = 0;
|
||||
SRequestConnInfo conn = {0};
|
||||
conn.pTrans = mockPointer;
|
||||
SSchedulerReq req = {0};
|
||||
req.pConn = &conn;
|
||||
req.pNodeList = qnodeList;
|
||||
req.pDag = &dag;
|
||||
req.pDag = dag;
|
||||
req.sql = "insert into tb values(now,1)";
|
||||
req.execFp = schtQueryCb;
|
||||
req.cbParam = NULL;
|
||||
req.cbParam = &queryDone;
|
||||
|
||||
code = schedulerExecJob(&req, &insertJobRefId);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(res.numOfRows, 20);
|
||||
|
||||
schedulerFreeJob(&insertJobRefId, 0);
|
||||
|
||||
|
@ -989,7 +1056,7 @@ TEST(multiThread, forceFree) {
|
|||
|
||||
TdThread thread1, thread2, thread3;
|
||||
taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL);
|
||||
taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
|
||||
// taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
|
||||
taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL);
|
||||
|
||||
while (true) {
|
||||
|
@ -1002,7 +1069,7 @@ TEST(multiThread, forceFree) {
|
|||
}
|
||||
|
||||
schtTestStop = true;
|
||||
taosSsleep(3);
|
||||
//taosSsleep(3);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
|
|
|
@ -569,6 +569,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
if (pDataBlock->info.parTbName[0]) {
|
||||
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER &&
|
||||
pTask->subtableWithoutMd5 != 1 &&
|
||||
!isAutoTableName(pDataBlock->info.parTbName) &&
|
||||
!alreadyAddGroupId(pDataBlock->info.parTbName) &&
|
||||
groupId != 0){
|
||||
|
|
|
@ -80,7 +80,7 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
|
||||
SArray* pTaskList, bool hasFillhistory) {
|
||||
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5) {
|
||||
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -96,6 +96,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
|
|||
pTask->info.taskLevel = taskLevel;
|
||||
pTask->info.fillHistory = fillHistory;
|
||||
pTask->info.triggerParam = triggerParam;
|
||||
pTask->subtableWithoutMd5 = subtableWithoutMd5;
|
||||
|
||||
pTask->status.pSM = streamCreateStateMachine(pTask);
|
||||
if (pTask->status.pSM == NULL) {
|
||||
|
@ -205,6 +206,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1;
|
||||
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
|
||||
|
||||
tEndEncode(pEncoder);
|
||||
|
@ -287,6 +289,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
|
||||
}
|
||||
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
|
|
|
@ -261,6 +261,7 @@ void updateInfoDestroy(SUpdateInfo *pInfo) {
|
|||
|
||||
taosArrayDestroy(pInfo->pTsSBFs);
|
||||
taosHashCleanup(pInfo->pMap);
|
||||
updateInfoDestoryColseWinSBF(pInfo);
|
||||
taosMemoryFree(pInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -140,11 +140,7 @@ void* rpcMallocCont(int64_t contLen) {
|
|||
return start + sizeof(STransMsgHead);
|
||||
}
|
||||
|
||||
void rpcFreeCont(void* cont) {
|
||||
if (cont == NULL) return;
|
||||
taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD);
|
||||
tTrace("rpc free cont:%p", (char*)cont - TRANS_MSG_OVERHEAD);
|
||||
}
|
||||
void rpcFreeCont(void* cont) { transFreeMsg(cont); }
|
||||
|
||||
void* rpcReallocCont(void* ptr, int64_t contLen) {
|
||||
if (ptr == NULL) return rpcMallocCont(contLen);
|
||||
|
|
|
@ -218,7 +218,6 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq,
|
|||
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
|
||||
/// NULL,cliHandleUpdate};
|
||||
|
||||
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
|
||||
static FORCE_INLINE void destroyCmsg(void* cmsg);
|
||||
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
|
||||
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
|
||||
|
@ -1950,14 +1949,6 @@ _err:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void destroyUserdata(STransMsg* userdata) {
|
||||
if (userdata->pCont == NULL) {
|
||||
return;
|
||||
}
|
||||
transFreeMsg(userdata->pCont);
|
||||
userdata->pCont = NULL;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void destroyCmsg(void* arg) {
|
||||
SCliMsg* pMsg = arg;
|
||||
if (pMsg == NULL) {
|
||||
|
@ -1965,7 +1956,7 @@ static FORCE_INLINE void destroyCmsg(void* arg) {
|
|||
}
|
||||
|
||||
transDestroyConnCtx(pMsg->ctx);
|
||||
destroyUserdata(&pMsg->msg);
|
||||
transFreeMsg(pMsg->msg.pCont);
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
|
||||
|
@ -1984,7 +1975,7 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
|
|||
tDebug("destroy Ahandle C");
|
||||
|
||||
transDestroyConnCtx(pMsg->ctx);
|
||||
destroyUserdata(&pMsg->msg);
|
||||
transFreeMsg(pMsg->msg.pCont);
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
|
||||
|
|
|
@ -87,6 +87,7 @@ void transFreeMsg(void* msg) {
|
|||
if (msg == NULL) {
|
||||
return;
|
||||
}
|
||||
tTrace("rpc free cont:%p", (char*)msg - TRANS_MSG_OVERHEAD);
|
||||
taosMemoryFree((char*)msg - sizeof(STransMsgHead));
|
||||
}
|
||||
int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
|
||||
|
|
|
@ -632,6 +632,11 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in
|
|||
return writeLen;
|
||||
}
|
||||
|
||||
bool lastErrorIsFileNotExist() {
|
||||
DWORD dwError = GetLastError();
|
||||
return dwError == ERROR_FILE_NOT_FOUND;
|
||||
}
|
||||
|
||||
#else
|
||||
int taosOpenFileNotStream(const char *path, int32_t tdFileOptions) {
|
||||
int access = O_BINARY;
|
||||
|
@ -1028,6 +1033,8 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in
|
|||
#endif
|
||||
}
|
||||
|
||||
bool lastErrorIsFileNotExist() { return errno == ENOENT; }
|
||||
|
||||
#endif // WINDOWS
|
||||
|
||||
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
|
||||
|
|
|
@ -350,7 +350,7 @@ void taosResetLog() {
|
|||
static bool taosCheckFileIsOpen(char *logFileName) {
|
||||
TdFilePtr pFile = taosOpenFile(logFileName, TD_FILE_WRITE);
|
||||
if (pFile == NULL) {
|
||||
if (errno == ENOENT) {
|
||||
if (lastErrorIsFileNotExist()) {
|
||||
return false;
|
||||
} else {
|
||||
printf("\nfailed to open log file:%s, reason:%s\n", logFileName, strerror(errno));
|
||||
|
|
|
@ -575,6 +575,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts_3405_3398_3423.py -N 3 -n 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4348-td-27939.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/backslash_g.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4467.py
|
||||
|
||||
,,n,system-test,python3 ./test.py -f 2-query/queryQnode.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
||||
|
|
|
@ -51,6 +51,22 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
$view1 = view1_name
|
||||
$view2 = view2_name
|
||||
|
||||
sql CREATE VIEW $view1 as select * from $table1
|
||||
sql CREATE VIEW $view2 AS select * from $table2
|
||||
|
||||
sql show views like 'view%'
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show views like 'view1%'
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
import random
|
||||
import itertools
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.sqlset import *
|
||||
from util import constant
|
||||
from util.common import *
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
"""Verify the jira TS-4467
|
||||
"""
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
def prepareData(self):
|
||||
# db
|
||||
tdSql.execute("create database if not exists db")
|
||||
tdSql.execute("use db")
|
||||
|
||||
# table
|
||||
tdSql.execute("create table t (ts timestamp, c1 varchar(16));")
|
||||
|
||||
# insert data
|
||||
sql = "insert into t values"
|
||||
for i in range(6):
|
||||
sql += f"(now+{str(i+1)}s, '{'name' + str(i+1)}')"
|
||||
sql += ";"
|
||||
tdSql.execute(sql)
|
||||
tdLog.debug("insert data successfully")
|
||||
|
||||
def run(self):
|
||||
self.prepareData()
|
||||
|
||||
# join query with order by
|
||||
sql = "select * from t t1, (select * from t order by ts limit 5) t2 where t1.ts = t2.ts;"
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(5)
|
||||
|
||||
sql = "select * from t t1, (select * from t order by ts desc limit 5) t2 where t1.ts = t2.ts;"
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(5)
|
||||
|
||||
sql = "select * from t t1, (select * from t order by ts limit 5) t2 where t1.ts = t2.ts order by t1.ts;"
|
||||
tdSql.query(sql)
|
||||
res1 = tdSql.queryResult
|
||||
tdLog.debug("res1: %s" % str(res1))
|
||||
|
||||
sql = "select * from t t1, (select * from t order by ts limit 5) t2 where t1.ts = t2.ts order by t1.ts desc;"
|
||||
tdSql.query(sql)
|
||||
res2 = tdSql.queryResult
|
||||
tdLog.debug("res2: %s" % str(res2))
|
||||
assert(len(res1) == len(res2) and res1[0][0] == res2[4][0])
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue