Merge pull request #20508 from taosdata/fix/liaohj
fix(client): fix potential memory leak.
This commit is contained in:
commit
ca1dffff45
|
@ -67,9 +67,10 @@ typedef enum {
|
||||||
* Create the exec task for stream mode
|
* Create the exec task for stream mode
|
||||||
* @param pMsg
|
* @param pMsg
|
||||||
* @param SReadHandle
|
* @param SReadHandle
|
||||||
|
* @param vgId
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
|
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the exec task for queue mode
|
* Create the exec task for queue mode
|
||||||
|
@ -77,7 +78,15 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
|
||||||
* @param SReadHandle
|
* @param SReadHandle
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema);
|
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* set the task Id, usually used by message queue process
|
||||||
|
* @param tinfo
|
||||||
|
* @param taskId
|
||||||
|
* @param queryId
|
||||||
|
*/
|
||||||
|
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
|
||||||
|
|
||||||
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
|
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -271,8 +271,6 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
SReqResultInfo *pResultInfo;
|
SReqResultInfo *pResultInfo;
|
||||||
if (msg->resIter == -1) {
|
if (msg->resIter == -1) {
|
||||||
pResultInfo = tmqGetNextResInfo(res, true);
|
pResultInfo = tmqGetNextResInfo(res, true);
|
||||||
tscDebug("consumer:0x%" PRIx64 ", vgId:%d, numOfRows:%" PRId64 ", total rows:%" PRId64, msg->rsp.head.consumerId,
|
|
||||||
msg->vgId, pResultInfo->numOfRows, pResultInfo->totalRows);
|
|
||||||
} else {
|
} else {
|
||||||
pResultInfo = tmqGetCurResInfo(res);
|
pResultInfo = tmqGetCurResInfo(res);
|
||||||
}
|
}
|
||||||
|
@ -287,9 +285,6 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " vgId:%d, numOfRows:%" PRId64 ", total rows:%" PRId64, msg->rsp.head.consumerId,
|
|
||||||
msg->vgId, pResultInfo->numOfRows, pResultInfo->totalRows);
|
|
||||||
|
|
||||||
doSetOneRowPtr(pResultInfo);
|
doSetOneRowPtr(pResultInfo);
|
||||||
pResultInfo->current += 1;
|
pResultInfo->current += 1;
|
||||||
return pResultInfo->row;
|
return pResultInfo->row;
|
||||||
|
|
|
@ -580,7 +580,10 @@ static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, t
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
taosThreadMutexLock(&tmq->lock);
|
taosThreadMutexLock(&tmq->lock);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
||||||
|
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " user invoked commit offset for %d", tmq->consumerId, numOfTopics);
|
||||||
|
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
if (strcmp(pTopic->topicName, topic) != 0) {
|
if (strcmp(pTopic->topicName, topic) != 0) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -874,7 +877,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
||||||
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
||||||
|
@ -905,6 +908,8 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
||||||
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
|
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
|
||||||
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
|
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
||||||
|
@ -1333,10 +1338,10 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
|
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%" PRId64
|
char buf[80];
|
||||||
" type %d, reqId:0x%" PRIx64,
|
tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset);
|
||||||
tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, rspType, requestId);
|
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
|
||||||
|
tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
|
||||||
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
|
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
|
||||||
|
@ -1375,6 +1380,11 @@ CREATE_MSG_FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct SVgroupSaveInfo {
|
||||||
|
STqOffsetVal offset;
|
||||||
|
int64_t numOfRows;
|
||||||
|
} SVgroupSaveInfo;
|
||||||
|
|
||||||
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
|
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
|
||||||
tmq_t* tmq) {
|
tmq_t* tmq) {
|
||||||
pTopic->schema = pTopicEp->schema;
|
pTopic->schema = pTopicEp->schema;
|
||||||
|
@ -1394,11 +1404,13 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
|
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
|
||||||
|
|
||||||
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
|
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
|
||||||
STqOffsetVal* pOffset = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
|
SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
|
||||||
|
|
||||||
|
int64_t numOfRows = 0;
|
||||||
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
|
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
|
||||||
if (pOffset != NULL) {
|
if (pInfo != NULL) {
|
||||||
offsetNew = *pOffset;
|
offsetNew = pInfo->offset;
|
||||||
|
numOfRows = pInfo->numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqClientVg clientVg = {
|
SMqClientVg clientVg = {
|
||||||
|
@ -1409,7 +1421,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
.vgStatus = TMQ_VG_STATUS__IDLE,
|
.vgStatus = TMQ_VG_STATUS__IDLE,
|
||||||
.vgSkipCnt = 0,
|
.vgSkipCnt = 0,
|
||||||
.emptyBlockReceiveTs = 0,
|
.emptyBlockReceiveTs = 0,
|
||||||
.numOfRows = 0,
|
.numOfRows = numOfRows,
|
||||||
};
|
};
|
||||||
|
|
||||||
taosArrayPush(pTopic->vgs, &clientVg);
|
taosArrayPush(pTopic->vgs, &clientVg);
|
||||||
|
@ -1461,7 +1473,9 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
||||||
tFormatOffset(buf, 80, &pVgCur->currentOffset);
|
tFormatOffset(buf, 80, &pVgCur->currentOffset);
|
||||||
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
|
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
|
||||||
pVgCur->vgId, vgKey, buf);
|
pVgCur->vgId, vgKey, buf);
|
||||||
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
|
|
||||||
|
SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows};
|
||||||
|
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1602,10 +1616,11 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg) {
|
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
||||||
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
||||||
pRspObj->resType = RES_TYPE__TMQ;
|
pRspObj->resType = RES_TYPE__TMQ;
|
||||||
|
|
||||||
|
(*numOfRows) = 0;
|
||||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||||
|
|
||||||
|
@ -1624,8 +1639,8 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg)
|
||||||
for(int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
|
for(int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
|
||||||
int64_t rows = htobe64(pRetrieve->numOfRows);
|
int64_t rows = htobe64(pRetrieve->numOfRows);
|
||||||
pRspObj->resInfo.totalRows += rows;
|
|
||||||
pVg->numOfRows += rows;
|
pVg->numOfRows += rows;
|
||||||
|
(*numOfRows) += rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
|
@ -1788,29 +1803,28 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
|
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SMqRspWrapper* rspWrapper = NULL;
|
SMqRspWrapper* pRspWrapper = NULL;
|
||||||
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
taosGetQitem(tmq->qall, (void**)&pRspWrapper);
|
||||||
|
|
||||||
if (rspWrapper == NULL) {
|
if (pRspWrapper == NULL) {
|
||||||
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
||||||
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
taosGetQitem(tmq->qall, (void**)&pRspWrapper);
|
||||||
|
|
||||||
if (rspWrapper == NULL) {
|
if (pRspWrapper == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("consumer:0x%"PRIx64" handle rsp, type:%d", tmq->consumerId, rspWrapper->tmqRspType);
|
tscDebug("consumer:0x%"PRIx64" handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
|
||||||
|
|
||||||
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
||||||
taosFreeQitem(rspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||||
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
|
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
|
||||||
return NULL;
|
return NULL;
|
||||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||||
|
|
||||||
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
|
|
||||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||||
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
|
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
|
||||||
|
|
||||||
|
@ -1833,28 +1847,31 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &pDataRsp->rspOffset);
|
tFormatOffset(buf, 80, &pDataRsp->rspOffset);
|
||||||
if (pDataRsp->blockNum == 0) {
|
if (pDataRsp->blockNum == 0) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, reqId:0x%" PRIx64, tmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%"PRId64" total:%"PRId64" reqId:0x%" PRIx64, tmq->consumerId,
|
||||||
pVg->vgId, buf, pollRspWrapper->reqId);
|
pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
|
||||||
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
rspWrapper = NULL;
|
|
||||||
continue;
|
|
||||||
} else { // build rsp
|
} else { // build rsp
|
||||||
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg);
|
int64_t numOfRows = 0;
|
||||||
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"PRId64" reqId:0x%" PRIx64,
|
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
||||||
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, pRsp->resInfo.totalRows, pollRspWrapper->reqId);
|
tmq->totalRows += numOfRows;
|
||||||
|
|
||||||
tmq->totalRows += pRsp->resInfo.totalRows;
|
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
||||||
|
" vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||||
|
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
|
||||||
|
pollRspWrapper->reqId);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||||
tmq->consumerId, pDataRsp->head.epoch, consumerEpoch);
|
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
tmqFreeRspWrapper(rspWrapper);
|
tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch);
|
||||||
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
|
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
|
||||||
|
@ -1868,13 +1885,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
||||||
tmqFreeRspWrapper(rspWrapper);
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||||
|
|
||||||
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
|
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
|
||||||
|
@ -1883,10 +1900,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
|
|
||||||
if (pollRspWrapper->taosxRsp.blockNum == 0) {
|
if (pollRspWrapper->taosxRsp.blockNum == 0) {
|
||||||
rspWrapper = NULL;
|
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 " reqId:0x%" PRIx64,
|
||||||
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
|
tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
|
||||||
pollRspWrapper->reqId);
|
|
||||||
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
||||||
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1895,32 +1912,37 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
|
|
||||||
// build rsp
|
// build rsp
|
||||||
void* pRsp = NULL;
|
void* pRsp = NULL;
|
||||||
|
int64_t numOfRows = 0;
|
||||||
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
|
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
|
||||||
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg);
|
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
||||||
} else {
|
} else {
|
||||||
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
|
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tmq->totalRows += numOfRows;
|
||||||
|
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &pVg->currentOffset);
|
tFormatOffset(buf, 80, &pVg->currentOffset);
|
||||||
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId, pVg->vgId,
|
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
||||||
buf, pollRspWrapper->dataRsp.blockNum, pollRspWrapper->reqId);
|
", vg total:%" PRId64 " total:%"PRId64" reqId:0x%" PRIx64,
|
||||||
|
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
|
||||||
|
tmq->totalRows, pollRspWrapper->reqId);
|
||||||
|
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||||
tmqFreeRspWrapper(rspWrapper);
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
|
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
|
||||||
|
|
||||||
bool reset = false;
|
bool reset = false;
|
||||||
tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
|
tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
|
||||||
taosFreeQitem(rspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
if (pollIfReset && reset) {
|
if (pollIfReset && reset) {
|
||||||
tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
|
tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
|
||||||
tmqPollImpl(tmq, timeout);
|
tmqPollImpl(tmq, timeout);
|
||||||
|
@ -1968,7 +1990,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
|
|
||||||
if (tmqPollImpl(tmq, timeout) < 0) {
|
if (tmqPollImpl(tmq, timeout) < 0) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
|
tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
|
||||||
/*return NULL;*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rspObj = tmqHandleAllRsp(tmq, timeout, false);
|
rspObj = tmqHandleAllRsp(tmq, timeout, false);
|
||||||
|
@ -1997,6 +2018,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_consumer_close(tmq_t* tmq) {
|
int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
|
tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status);
|
||||||
|
|
||||||
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
|
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
|
||||||
int32_t rsp = tmq_commit_sync(tmq, NULL);
|
int32_t rsp = tmq_commit_sync(tmq, NULL);
|
||||||
if (rsp != 0) {
|
if (rsp != 0) {
|
||||||
|
|
|
@ -112,7 +112,7 @@ void createNewTable(TAOS* pConn, int32_t index) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
for(int32_t i = 0; i < 2000; i += 20) {
|
for(int32_t i = 0; i < 100; i += 20) {
|
||||||
char sql[1024] = {0};
|
char sql[1024] = {0};
|
||||||
sprintf(sql,
|
sprintf(sql,
|
||||||
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
||||||
|
@ -167,6 +167,80 @@ void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) {
|
||||||
printf("success, code:%d\n", code);
|
printf("success, code:%d\n", code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void* doConsumeData(void* param) {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
|
||||||
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
|
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||||
|
tmq_conf_set(conf, "group.id", "cgrpName12");
|
||||||
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
|
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||||
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||||
|
|
||||||
|
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
|
||||||
|
// 创建订阅 topics 列表
|
||||||
|
tmq_list_t* topicList = tmq_list_new();
|
||||||
|
tmq_list_append(topicList, "topic_t2");
|
||||||
|
|
||||||
|
// 启动订阅
|
||||||
|
tmq_subscribe(tmq, topicList);
|
||||||
|
|
||||||
|
tmq_list_destroy(topicList);
|
||||||
|
|
||||||
|
TAOS_FIELD* fields = NULL;
|
||||||
|
int32_t numOfFields = 0;
|
||||||
|
int32_t precision = 0;
|
||||||
|
int32_t totalRows = 0;
|
||||||
|
int32_t msgCnt = 0;
|
||||||
|
int32_t timeout = 25000;
|
||||||
|
|
||||||
|
int32_t count = 0;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
||||||
|
if (pRes) {
|
||||||
|
char buf[1024];
|
||||||
|
|
||||||
|
const char* topicName = tmq_get_topic_name(pRes);
|
||||||
|
const char* dbName = tmq_get_db_name(pRes);
|
||||||
|
int32_t vgroupId = tmq_get_vgroup_id(pRes);
|
||||||
|
|
||||||
|
printf("topic: %s\n", topicName);
|
||||||
|
printf("db: %s\n", dbName);
|
||||||
|
printf("vgroup id: %d\n", vgroupId);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
TAOS_ROW row = taos_fetch_row(pRes);
|
||||||
|
if (row == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
fields = taos_fetch_fields(pRes);
|
||||||
|
numOfFields = taos_field_count(pRes);
|
||||||
|
precision = taos_result_precision(pRes);
|
||||||
|
taos_print_row(buf, row, fields, numOfFields);
|
||||||
|
totalRows += 1;
|
||||||
|
// printf("precision: %d, row content: %s\n", precision, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pRes);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tmq_consumer_close(tmq);
|
||||||
|
taos_close(pConn);
|
||||||
|
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -188,7 +262,6 @@ TEST(clientCase, driverInit_Test) {
|
||||||
|
|
||||||
TEST(clientCase, connect_Test) {
|
TEST(clientCase, connect_Test) {
|
||||||
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
|
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
|
||||||
|
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
|
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
|
||||||
|
@ -708,7 +781,7 @@ TEST(clientCase, projection_query_tables) {
|
||||||
// }
|
// }
|
||||||
// taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, "use abc2");
|
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
|
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
|
||||||
|
@ -730,7 +803,7 @@ TEST(clientCase, projection_query_tables) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
for (int32_t i = 0; i < 2; ++i) {
|
for (int32_t i = 0; i < 10000; ++i) {
|
||||||
printf("create table :%d\n", i);
|
printf("create table :%d\n", i);
|
||||||
createNewTable(pConn, i);
|
createNewTable(pConn, i);
|
||||||
}
|
}
|
||||||
|
@ -970,28 +1043,23 @@ TEST(clientCase, sub_db_test) {
|
||||||
taos_print_row(buf, row, fields, numOfFields);
|
taos_print_row(buf, row, fields, numOfFields);
|
||||||
printf("precision: %d, row content: %s\n", precision, buf);
|
printf("precision: %d, row content: %s\n", precision, buf);
|
||||||
}
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
}
|
}
|
||||||
// return rows;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(clientCase, sub_tb_test) {
|
TEST(clientCase, sub_tb_test) {
|
||||||
|
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
|
||||||
|
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1");
|
|
||||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
|
||||||
// printf("failed to create topic, code:%s", taos_errstr(pRes));
|
|
||||||
// taos_free_result(pRes);
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
|
|
||||||
tmq_conf_t* conf = tmq_conf_new();
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||||
tmq_conf_set(conf, "group.id", "cgrpName");
|
tmq_conf_set(conf, "group.id", "cgrpName27");
|
||||||
tmq_conf_set(conf, "td.connect.user", "root");
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
|
@ -1004,10 +1072,11 @@ TEST(clientCase, sub_tb_test) {
|
||||||
|
|
||||||
// 创建订阅 topics 列表
|
// 创建订阅 topics 列表
|
||||||
tmq_list_t* topicList = tmq_list_new();
|
tmq_list_t* topicList = tmq_list_new();
|
||||||
tmq_list_append(topicList, "topic_t1");
|
tmq_list_append(topicList, "topic_t2");
|
||||||
|
|
||||||
// 启动订阅
|
// 启动订阅
|
||||||
tmq_subscribe(tmq, topicList);
|
tmq_subscribe(tmq, topicList);
|
||||||
|
|
||||||
tmq_list_destroy(topicList);
|
tmq_list_destroy(topicList);
|
||||||
|
|
||||||
TAOS_FIELD* fields = NULL;
|
TAOS_FIELD* fields = NULL;
|
||||||
|
@ -1015,7 +1084,7 @@ TEST(clientCase, sub_tb_test) {
|
||||||
int32_t precision = 0;
|
int32_t precision = 0;
|
||||||
int32_t totalRows = 0;
|
int32_t totalRows = 0;
|
||||||
int32_t msgCnt = 0;
|
int32_t msgCnt = 0;
|
||||||
int32_t timeout = 5000;
|
int32_t timeout = 25000;
|
||||||
|
|
||||||
int32_t count = 0;
|
int32_t count = 0;
|
||||||
|
|
||||||
|
@ -1023,7 +1092,6 @@ TEST(clientCase, sub_tb_test) {
|
||||||
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
||||||
if (pRes) {
|
if (pRes) {
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
int32_t rows = 0;
|
|
||||||
|
|
||||||
const char* topicName = tmq_get_topic_name(pRes);
|
const char* topicName = tmq_get_topic_name(pRes);
|
||||||
const char* dbName = tmq_get_db_name(pRes);
|
const char* dbName = tmq_get_db_name(pRes);
|
||||||
|
@ -1033,27 +1101,45 @@ TEST(clientCase, sub_tb_test) {
|
||||||
printf("db: %s\n", dbName);
|
printf("db: %s\n", dbName);
|
||||||
printf("vgroup id: %d\n", vgroupId);
|
printf("vgroup id: %d\n", vgroupId);
|
||||||
|
|
||||||
if (count ++ > 200) {
|
|
||||||
tmq_unsubscribe(tmq);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
TAOS_ROW row = taos_fetch_row(pRes);
|
TAOS_ROW row = taos_fetch_row(pRes);
|
||||||
if (row == NULL) break;
|
if (row == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
fields = taos_fetch_fields(pRes);
|
fields = taos_fetch_fields(pRes);
|
||||||
numOfFields = taos_field_count(pRes);
|
numOfFields = taos_field_count(pRes);
|
||||||
precision = taos_result_precision(pRes);
|
precision = taos_result_precision(pRes);
|
||||||
rows++;
|
|
||||||
taos_print_row(buf, row, fields, numOfFields);
|
taos_print_row(buf, row, fields, numOfFields);
|
||||||
|
totalRows += 1;
|
||||||
printf("precision: %d, row content: %s\n", precision, buf);
|
printf("precision: %d, row content: %s\n", precision, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taos_free_result(pRes);
|
||||||
|
// if ((++count) > 1) {
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
// return rows;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tmq_consumer_close(tmq);
|
||||||
|
taos_close(pConn);
|
||||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(clientCase, sub_tb_mt_test) {
|
||||||
|
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
|
||||||
|
TdThread qid[20] = {0};
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < 1; ++i) {
|
||||||
|
taosThreadCreate(&qid[i], NULL, doConsumeData, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < 4; ++i) {
|
||||||
|
taosThreadJoin(qid[i], NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
|
@ -75,9 +75,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||||
|
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
|
|
||||||
pTask->pMsgCb = &pSnode->msgCb;
|
pTask->pMsgCb = &pSnode->msgCb;
|
||||||
|
|
||||||
pTask->startVer = ver;
|
pTask->startVer = ver;
|
||||||
|
|
||||||
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
||||||
|
@ -90,11 +88,11 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||||
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
|
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
|
||||||
.pStateBackend = pTask->pState,
|
.pStateBackend = pTask->pState,
|
||||||
};
|
};
|
||||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
|
|
||||||
|
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0);
|
||||||
ASSERT(pTask->exec.executor);
|
ASSERT(pTask->exec.executor);
|
||||||
|
|
||||||
streamSetupTrigger(pTask);
|
streamSetupTrigger(pTask);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -180,6 +180,7 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n
|
||||||
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
||||||
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr);
|
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr);
|
||||||
|
|
||||||
|
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
void tsdbReaderClose(STsdbReader *pReader);
|
||||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "meta.h"
|
#include "meta.h"
|
||||||
|
#include "osMemory.h"
|
||||||
|
#include "tencode.h"
|
||||||
|
|
||||||
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
|
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
|
||||||
memset(pReader, 0, sizeof(*pReader));
|
memset(pReader, 0, sizeof(*pReader));
|
||||||
|
@ -1235,9 +1237,14 @@ END:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
int32_t ret = 0;
|
SMetaEntry oStbEntry = {0};
|
||||||
char *buf = NULL;
|
int32_t ret = -1;
|
||||||
|
char *buf = NULL;
|
||||||
|
void *pData = NULL;
|
||||||
|
int nData = 0;
|
||||||
|
|
||||||
|
SDecoder dc = {0};
|
||||||
|
STbDbKey tbDbKey = {0};
|
||||||
STagIdxKey *pKey = NULL;
|
STagIdxKey *pKey = NULL;
|
||||||
int32_t nKey = 0;
|
int32_t nKey = 0;
|
||||||
|
|
||||||
|
@ -1249,8 +1256,34 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
pCursor->type = param->type;
|
pCursor->type = param->type;
|
||||||
|
|
||||||
metaRLock(pMeta);
|
metaRLock(pMeta);
|
||||||
|
|
||||||
|
if (tdbTbGet(pMeta->pUidIdx, ¶m->suid, sizeof(tb_uid_t), &pData, &nData) != 0) {
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
tbDbKey.uid = param->suid;
|
||||||
|
tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
|
||||||
|
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
|
||||||
|
|
||||||
|
tDecoderInit(&dc, pData, nData);
|
||||||
|
ret = metaDecodeEntry(&dc, &oStbEntry);
|
||||||
|
|
||||||
|
if (oStbEntry.stbEntry.schemaTag.pSchema == NULL || oStbEntry.stbEntry.schemaTag.pSchema == NULL) {
|
||||||
|
ret = -1;
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
ret = -1;
|
||||||
|
for (int i = 0; i < oStbEntry.stbEntry.schemaTag.nCols; i++) {
|
||||||
|
SSchema *schema = oStbEntry.stbEntry.schemaTag.pSchema + i;
|
||||||
|
if (schema->colId == param->cid && param->type == schema->type && (IS_IDX_ON(schema) || i == 0)) {
|
||||||
|
ret = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ret != 0) {
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
|
ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
|
||||||
if (ret < 0) {
|
if (ret != 0) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1271,6 +1304,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
maxSize = 4 * nTagData + 1;
|
maxSize = 4 * nTagData + 1;
|
||||||
buf = taosMemoryCalloc(1, maxSize);
|
buf = taosMemoryCalloc(1, maxSize);
|
||||||
if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize)) {
|
if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize)) {
|
||||||
|
ret = -1;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1288,8 +1322,10 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
int cmp = 0;
|
int cmp = 0;
|
||||||
if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
|
ret = tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp);
|
||||||
|
if (ret != 0) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1353,6 +1389,10 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
END:
|
END:
|
||||||
if (pCursor->pMeta) metaULock(pCursor->pMeta);
|
if (pCursor->pMeta) metaULock(pCursor->pMeta);
|
||||||
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
|
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
|
||||||
|
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
tdbFree(pData);
|
||||||
|
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
taosMemoryFree(pKey);
|
taosMemoryFree(pKey);
|
||||||
|
|
||||||
|
|
|
@ -282,7 +282,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
||||||
.initTqReader = 1,
|
.initTqReader = 1,
|
||||||
.pStateBackend = pStreamState,
|
.pStateBackend = pStreamState,
|
||||||
};
|
};
|
||||||
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
|
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode));
|
||||||
if (!pRSmaInfo->taskInfo[idx]) {
|
if (!pRSmaInfo->taskInfo[idx]) {
|
||||||
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
|
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -864,7 +864,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
|
dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode));
|
||||||
if (!dstTaskInfo) {
|
if (!dstTaskInfo) {
|
||||||
code = TSDB_CODE_RSMA_QTASKINFO_CREATE;
|
code = TSDB_CODE_RSMA_QTASKINFO_CREATE;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
|
@ -296,51 +296,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// int32_t len = 0;
|
|
||||||
// int32_t code = 0;
|
|
||||||
//
|
|
||||||
// if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
|
||||||
// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
|
||||||
// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
|
||||||
// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (code < 0) {
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// int32_t tlen = sizeof(SMqRspHead) + len;
|
|
||||||
// void* buf = rpcMallocCont(tlen);
|
|
||||||
// if (buf == NULL) {
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// ((SMqRspHead*)buf)->mqMsgType = type;
|
|
||||||
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
|
|
||||||
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
|
||||||
//
|
|
||||||
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
|
||||||
//
|
|
||||||
// SEncoder encoder = {0};
|
|
||||||
// tEncoderInit(&encoder, abuf, len);
|
|
||||||
//
|
|
||||||
// if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
|
||||||
// tEncodeSMqDataRsp(&encoder, pRsp);
|
|
||||||
// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
|
||||||
// tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// tEncoderClear(&encoder);
|
|
||||||
//
|
|
||||||
// SRpcMsg rsp = {
|
|
||||||
// .info = pMsg->info,
|
|
||||||
// .pCont = buf,
|
|
||||||
// .contLen = tlen,
|
|
||||||
// .code = 0,
|
|
||||||
// };
|
|
||||||
//
|
|
||||||
// tmsgSendRsp(&rsp);
|
|
||||||
doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
|
doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
|
||||||
|
|
||||||
char buf1[80] = {0};
|
char buf1[80] = {0};
|
||||||
|
@ -348,8 +303,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
|
|
||||||
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s",
|
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%"PRIx64,
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -425,6 +380,7 @@ static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOf
|
||||||
|
|
||||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
STqOffset offset = {0};
|
STqOffset offset = {0};
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||||
|
@ -436,10 +392,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
|
|
||||||
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
|
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
|
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
|
||||||
offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
|
offset.subKey, vgId, offset.val.uid, offset.val.ts);
|
||||||
} else if (offset.val.type == TMQ_OFFSET__LOG) {
|
} else if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||||
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
|
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
|
||||||
TD_VID(pTq->pVnode), offset.val.version);
|
vgId, offset.val.version);
|
||||||
if (offset.val.version + 1 == sversion) {
|
if (offset.val.version + 1 == sversion) {
|
||||||
offset.val.version += 1;
|
offset.val.version += 1;
|
||||||
}
|
}
|
||||||
|
@ -542,6 +498,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
uint64_t consumerId = pRequest->consumerId;
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
STqOffsetVal reqOffset = pRequest->reqOffset;
|
STqOffsetVal reqOffset = pRequest->reqOffset;
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
|
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
*pBlockReturned = false;
|
*pBlockReturned = false;
|
||||||
|
|
||||||
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
|
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
|
||||||
|
@ -551,12 +509,15 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
char formatBuf[80];
|
char formatBuf[80];
|
||||||
tFormatOffset(formatBuf, 80, pOffsetVal);
|
tFormatOffset(formatBuf, 80, pOffsetVal);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.",
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), formatBuf);
|
consumerId, pHandle->subKey, vgId, formatBuf);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
||||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||||
if (pRequest->useSnapshot) {
|
if (pRequest->useSnapshot) {
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
|
||||||
|
consumerId, pHandle->subKey, vgId);
|
||||||
|
|
||||||
if (pHandle->fetchMeta) {
|
if (pHandle->fetchMeta) {
|
||||||
tqOffsetResetToMeta(pOffsetVal, 0);
|
tqOffsetResetToMeta(pOffsetVal, 0);
|
||||||
} else {
|
} else {
|
||||||
|
@ -577,8 +538,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||||
|
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
|
||||||
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
pHandle->subKey, vgId, dataRsp.rspOffset.version);
|
||||||
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
|
|
||||||
|
@ -589,16 +550,14 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
tqInitTaosxRsp(&taosxRsp, pRequest);
|
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
// int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
|
||||||
*pBlockReturned = true;
|
*pBlockReturned = true;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||||
tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64
|
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
|
||||||
" in vg %d, subkey %s, reset none failed",
|
pHandle->subKey, consumerId, vgId, pRequest->subKey);
|
||||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pRequest->subKey);
|
|
||||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -639,6 +598,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
|
|
||||||
// lock
|
// lock
|
||||||
taosWLockLatch(&pTq->pushLock);
|
taosWLockLatch(&pTq->pushLock);
|
||||||
|
|
||||||
|
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||||
code = tqScanData(pTq, pHandle, &dataRsp, &offset);
|
code = tqScanData(pTq, pHandle, &dataRsp, &offset);
|
||||||
|
|
||||||
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||||
|
@ -906,7 +867,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
SMqRebVgReq req = {0};
|
SMqRebVgReq req = {0};
|
||||||
tDecodeSMqRebVgReq(msg, &req);
|
tDecodeSMqRebVgReq(msg, &req);
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
|
SVnode* pVnode = pTq->pVnode;
|
||||||
|
int32_t vgId = TD_VID(pVnode);
|
||||||
|
|
||||||
|
tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
|
||||||
req.oldConsumerId, req.newConsumerId);
|
req.oldConsumerId, req.newConsumerId);
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
|
@ -935,7 +899,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
pHandle->fetchMeta = req.withMeta;
|
pHandle->fetchMeta = req.withMeta;
|
||||||
|
|
||||||
// TODO version should be assigned and refed during preprocess
|
// TODO version should be assigned and refed during preprocess
|
||||||
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
|
SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
|
||||||
if (pRef == NULL) {
|
if (pRef == NULL) {
|
||||||
taosMemoryFree(req.qmsg);
|
taosMemoryFree(req.qmsg);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -945,8 +909,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
pHandle->pRef = pRef;
|
pHandle->pRef = pRef;
|
||||||
|
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.meta = pTq->pVnode->pMeta,
|
.meta = pVnode->pMeta,
|
||||||
.vnode = pTq->pVnode,
|
.vnode = pVnode,
|
||||||
.initTableReader = true,
|
.initTableReader = true,
|
||||||
.initTqReader = true,
|
.initTqReader = true,
|
||||||
.version = ver,
|
.version = ver,
|
||||||
|
@ -959,38 +923,38 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
req.qmsg = NULL;
|
req.qmsg = NULL;
|
||||||
|
|
||||||
pHandle->execHandle.task =
|
pHandle->execHandle.task =
|
||||||
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL);
|
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL);
|
||||||
void* scanner = NULL;
|
void* scanner = NULL;
|
||||||
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
|
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
|
||||||
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||||
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
|
||||||
|
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||||
(SSnapContext**)(&handle.sContext));
|
(SSnapContext**)(&handle.sContext));
|
||||||
|
|
||||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||||
pHandle->execHandle.execTb.suid = req.suid;
|
pHandle->execHandle.execTb.suid = req.suid;
|
||||||
|
|
||||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||||
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
|
vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
|
||||||
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
|
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||||
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
|
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
|
||||||
}
|
}
|
||||||
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
|
||||||
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
|
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
|
|
||||||
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
|
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||||
(SSnapContext**)(&handle.sContext));
|
(SSnapContext**)(&handle.sContext));
|
||||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||||
|
@ -1043,6 +1007,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
pTask->refCnt = 1;
|
pTask->refCnt = 1;
|
||||||
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||||
|
|
||||||
|
@ -1055,9 +1020,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
|
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
|
|
||||||
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
||||||
|
|
||||||
pTask->startVer = ver;
|
pTask->startVer = ver;
|
||||||
|
|
||||||
// expand executor
|
// expand executor
|
||||||
|
@ -1077,7 +1040,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
.initTqReader = 1,
|
.initTqReader = 1,
|
||||||
.pStateBackend = pTask->pState,
|
.pStateBackend = pTask->pState,
|
||||||
};
|
};
|
||||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
|
||||||
|
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
|
||||||
if (pTask->exec.executor == NULL) {
|
if (pTask->exec.executor == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1092,7 +1056,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
|
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
|
||||||
.pStateBackend = pTask->pState,
|
.pStateBackend = pTask->pState,
|
||||||
};
|
};
|
||||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
|
|
||||||
|
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
|
||||||
if (pTask->exec.executor == NULL) {
|
if (pTask->exec.executor == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1122,9 +1087,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
streamSetupTrigger(pTask);
|
streamSetupTrigger(pTask);
|
||||||
|
tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", vgId, pTask->taskId, pTask->selfChildId, pTask->taskLevel);
|
||||||
tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", TD_VID(pTq->pVnode), pTask->taskId,
|
|
||||||
pTask->selfChildId, pTask->taskLevel);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -92,7 +92,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
|
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task executed, get %p", pHandle->consumerId, pTq->pVnode->config.vgId, pDataBlock);
|
||||||
|
|
||||||
// current scan should be stopped asap, since the rebalance occurs.
|
// current scan should be stopped asap, since the rebalance occurs.
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
|
@ -120,7 +120,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pRsp->withTbName || pRsp->withSchema){
|
if (pRsp->withTbName || pRsp->withSchema) {
|
||||||
tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema);
|
tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -275,6 +275,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
void* pKey = NULL;
|
void* pKey = NULL;
|
||||||
int kLen = 0;
|
int kLen = 0;
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
|
@ -306,7 +307,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
|
|
||||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
handle.execHandle.task =
|
handle.execHandle.task =
|
||||||
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL);
|
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, NULL);
|
||||||
if (handle.execHandle.task == NULL) {
|
if (handle.execHandle.task == NULL) {
|
||||||
tqError("cannot create exec task for %s", handle.subKey);
|
tqError("cannot create exec task for %s", handle.subKey);
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -331,7 +332,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
|
|
||||||
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
|
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
|
||||||
(SSnapContext**)(&reader.sContext));
|
(SSnapContext**)(&reader.sContext));
|
||||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL);
|
||||||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
|
|
||||||
|
@ -348,7 +349,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
|
|
||||||
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
|
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
|
||||||
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
|
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
|
||||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
|
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL);
|
||||||
}
|
}
|
||||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
|
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
|
||||||
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
||||||
|
|
|
@ -427,7 +427,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
|
||||||
return pTableMap;
|
return pTableMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
|
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts, int32_t step) {
|
||||||
STableBlockScanInfo** p = NULL;
|
STableBlockScanInfo** p = NULL;
|
||||||
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
||||||
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
|
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
|
||||||
|
@ -446,6 +446,7 @@ static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
|
||||||
|
|
||||||
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||||
pInfo->lastKey = ts;
|
pInfo->lastKey = ts;
|
||||||
|
pInfo->lastKeyInStt = ts + step;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2471,7 +2472,6 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
||||||
initMemDataIterator(pScanInfo, pReader);
|
initMemDataIterator(pScanInfo, pReader);
|
||||||
pLBlockReader->uid = pScanInfo->uid;
|
pLBlockReader->uid = pScanInfo->uid;
|
||||||
|
|
||||||
int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
|
|
||||||
STimeWindow w = pLBlockReader->window;
|
STimeWindow w = pLBlockReader->window;
|
||||||
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
|
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
|
||||||
w.skey = pScanInfo->lastKeyInStt;
|
w.skey = pScanInfo->lastKeyInStt;
|
||||||
|
@ -4457,8 +4457,9 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
if (pReader->step == EXTERNAL_ROWS_PREV) {
|
if (pReader->step == EXTERNAL_ROWS_PREV) {
|
||||||
// prepare for the main scan
|
// prepare for the main scan
|
||||||
int32_t code = doOpenReaderImpl(pReader);
|
code = doOpenReaderImpl(pReader);
|
||||||
resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
|
int32_t step = 1;
|
||||||
|
resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey, step);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -4479,8 +4480,9 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
|
if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
|
||||||
// prepare for the next row scan
|
// prepare for the next row scan
|
||||||
int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
|
int32_t step = -1;
|
||||||
resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
|
code = doOpenReaderImpl(pReader->innerReader[1]);
|
||||||
|
resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey, step);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4683,15 +4685,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
|
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
|
||||||
tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap);
|
tsdbDebug("tsdb reader reset return %p, %s", pReader->pReadSnap, pReader->idStr);
|
||||||
|
|
||||||
tsdbReleaseReader(pReader);
|
tsdbReleaseReader(pReader);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||||
|
|
||||||
pReader->order = pCond->order;
|
pReader->order = pCond->order;
|
||||||
|
@ -4712,8 +4711,10 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
resetDataBlockIterator(pBlockIter, pReader->order);
|
resetDataBlockIterator(pBlockIter, pReader->order);
|
||||||
resetTableListIndex(&pReader->status);
|
resetTableListIndex(&pReader->status);
|
||||||
|
|
||||||
int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
|
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||||
resetAllDataBlockScanInfo(pStatus->pTableMap, ts);
|
int32_t step = asc? 1:-1;
|
||||||
|
int64_t ts = asc? pReader->window.skey - 1 : pReader->window.ekey + 1;
|
||||||
|
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -4728,7 +4729,6 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||||
|
|
||||||
tsdbReleaseReader(pReader);
|
tsdbReleaseReader(pReader);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4995,3 +4995,9 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti
|
||||||
}
|
}
|
||||||
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
|
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if failed, do nothing
|
||||||
|
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
|
||||||
|
taosMemoryFreeClear(pReader->idStr);
|
||||||
|
pReader->idStr = taosStrdup(idstr);
|
||||||
|
}
|
|
@ -115,6 +115,7 @@ typedef struct STaskIdInfo {
|
||||||
uint64_t subplanId;
|
uint64_t subplanId;
|
||||||
uint64_t templateId;
|
uint64_t templateId;
|
||||||
char* str;
|
char* str;
|
||||||
|
int32_t vgId;
|
||||||
} STaskIdInfo;
|
} STaskIdInfo;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -834,8 +835,10 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
|
||||||
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
||||||
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
||||||
|
|
||||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
char* buildTaskId(uint64_t taskId, uint64_t queryId);
|
||||||
char* sql, EOPTR_EXEC_MODEL model);
|
|
||||||
|
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||||
|
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
|
||||||
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
|
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
|
||||||
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
|
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
#include "index.h"
|
#include "index.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "query.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
@ -1058,10 +1059,10 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
||||||
SIndexMetaArg metaArg = {
|
SIndexMetaArg metaArg = {
|
||||||
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = pIndex, .suid = pScanNode->uid};
|
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = pIndex, .suid = pScanNode->uid};
|
||||||
|
|
||||||
SIdxFltStatus status = SFLT_NOT_INDEX;
|
status = SFLT_NOT_INDEX;
|
||||||
code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status);
|
code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status);
|
||||||
if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake
|
if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake
|
||||||
// qError("failed to get tableIds from index, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid);
|
qWarn("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid);
|
||||||
code = TDB_CODE_SUCCESS;
|
code = TDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
|
qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
|
||||||
|
|
|
@ -159,6 +159,30 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void doSetTaskId(SOperatorInfo* pOperator) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
SStreamScanInfo* pStreamScanInfo = pOperator->info;
|
||||||
|
STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
|
||||||
|
if (pScanInfo->base.dataReader != NULL) {
|
||||||
|
tsdbReaderSetId(pScanInfo->base.dataReader, pTaskInfo->id.str);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
doSetTaskId(pOperator->pDownstream[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
|
||||||
|
SExecTaskInfo* pTaskInfo = tinfo;
|
||||||
|
pTaskInfo->id.queryId = queryId;
|
||||||
|
|
||||||
|
taosMemoryFreeClear(pTaskInfo->id.str);
|
||||||
|
pTaskInfo->id.str = buildTaskId(taskId, queryId);
|
||||||
|
|
||||||
|
// set the idstr for tsdbReader
|
||||||
|
doSetTaskId(pTaskInfo->pRoot);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
|
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
|
||||||
if (tinfo == NULL) {
|
if (tinfo == NULL) {
|
||||||
return TSDB_CODE_APP_ERROR;
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
@ -218,7 +242,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
|
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema) {
|
||||||
if (msg == NULL) {
|
if (msg == NULL) {
|
||||||
// create raw scan
|
// create raw scan
|
||||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||||
|
@ -231,7 +255,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
|
||||||
|
|
||||||
pTaskInfo->cost.created = taosGetTimestampUs();
|
pTaskInfo->cost.created = taosGetTimestampUs();
|
||||||
pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
|
pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
|
||||||
pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo);
|
pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
|
||||||
if (NULL == pTaskInfo->pRoot) {
|
if (NULL == pTaskInfo->pRoot) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosMemoryFree(pTaskInfo);
|
taosMemoryFree(pTaskInfo);
|
||||||
|
@ -248,7 +272,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
|
||||||
}
|
}
|
||||||
|
|
||||||
qTaskInfo_t pTaskInfo = NULL;
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
|
code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
nodesDestroyNode((SNode*)pPlan);
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
qDestroyTask(pTaskInfo);
|
qDestroyTask(pTaskInfo);
|
||||||
|
@ -274,13 +298,11 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
|
||||||
return pTaskInfo;
|
return pTaskInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
|
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId) {
|
||||||
if (msg == NULL) {
|
if (msg == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*qDebugL("stream task string %s", (const char*)msg);*/
|
|
||||||
|
|
||||||
struct SSubplan* pPlan = NULL;
|
struct SSubplan* pPlan = NULL;
|
||||||
int32_t code = qStringToSubplan(msg, &pPlan);
|
int32_t code = qStringToSubplan(msg, &pPlan);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -289,7 +311,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
|
||||||
}
|
}
|
||||||
|
|
||||||
qTaskInfo_t pTaskInfo = NULL;
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
|
code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
nodesDestroyNode((SNode*)pPlan);
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
qDestroyTask(pTaskInfo);
|
qDestroyTask(pTaskInfo);
|
||||||
|
@ -468,11 +490,11 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
||||||
|
|
||||||
taosThreadOnce(&initPoolOnce, initRefPool);
|
taosThreadOnce(&initPoolOnce, initRefPool);
|
||||||
|
|
||||||
qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
|
qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
|
||||||
|
|
||||||
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
|
int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to createExecTaskInfoImpl, code: %s", tstrerror(code));
|
qError("failed to createExecTaskInfo, code: %s", tstrerror(code));
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1101,7 +1123,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
|
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
|
||||||
|
|
||||||
qDebug("switch to next table %" PRId64 " ts %" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows);
|
qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows);
|
||||||
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
||||||
|
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
@ -1140,7 +1162,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
pTableScanInfo->base.cond.twindows.skey = oldSkey;
|
pTableScanInfo->base.cond.twindows.skey = oldSkey;
|
||||||
pTableScanInfo->scanTimes = 0;
|
pTableScanInfo->scanTimes = 0;
|
||||||
|
|
||||||
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
|
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
|
||||||
ts, pTableScanInfo->currentTable, numOfTables);
|
ts, pTableScanInfo->currentTable, numOfTables);
|
||||||
} else {
|
} else {
|
||||||
qError("invalid pOffset->type:%d", pOffset->type);
|
qError("invalid pOffset->type:%d", pOffset->type);
|
||||||
|
|
|
@ -1959,7 +1959,7 @@ void destroyAggOperatorInfo(void* param) {
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
static char* buildTaskId(uint64_t taskId, uint64_t queryId) {
|
char* buildTaskId(uint64_t taskId, uint64_t queryId) {
|
||||||
char* p = taosMemoryMalloc(64);
|
char* p = taosMemoryMalloc(64);
|
||||||
|
|
||||||
int32_t offset = 6;
|
int32_t offset = 6;
|
||||||
|
@ -1971,11 +1971,10 @@ static char* buildTaskId(uint64_t taskId, uint64_t queryId) {
|
||||||
offset += tintToHex(queryId, &p[offset]);
|
offset += tintToHex(queryId, &p[offset]);
|
||||||
|
|
||||||
p[offset] = 0;
|
p[offset] = 0;
|
||||||
|
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
|
static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) {
|
||||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||||
if (pTaskInfo == NULL) {
|
if (pTaskInfo == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1990,6 +1989,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
|
||||||
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
|
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
|
||||||
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
|
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
|
||||||
|
|
||||||
|
pTaskInfo->id.vgId = vgId;
|
||||||
pTaskInfo->id.queryId = queryId;
|
pTaskInfo->id.queryId = queryId;
|
||||||
pTaskInfo->id.str = buildTaskId(taskId, queryId);
|
pTaskInfo->id.str = buildTaskId(taskId, queryId);
|
||||||
return pTaskInfo;
|
return pTaskInfo;
|
||||||
|
@ -2178,7 +2178,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
int32_t sz = tableListGetSize(pTableListInfo);
|
int32_t sz = tableListGetSize(pTableListInfo);
|
||||||
qDebug("create stream task, total:%d", sz);
|
qDebug("vgId:%d create stream task, total qualified tables:%d, %s", pTaskInfo->id.vgId, sz, idstr);
|
||||||
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
|
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||||
|
@ -2439,17 +2439,14 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||||
char* sql, EOPTR_EXEC_MODEL model) {
|
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
|
||||||
uint64_t queryId = pPlan->id.queryId;
|
*pTaskInfo = doCreateExecTaskInfo(pPlan->id.queryId, taskId, vgId, model, pPlan->dbFName);
|
||||||
|
|
||||||
*pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
|
|
||||||
if (*pTaskInfo == NULL) {
|
if (*pTaskInfo == NULL) {
|
||||||
goto _complete;
|
goto _complete;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
/*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/
|
|
||||||
if (pHandle->pStateBackend) {
|
if (pHandle->pStateBackend) {
|
||||||
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
|
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
|
||||||
}
|
}
|
||||||
|
|
|
@ -634,7 +634,7 @@ static FORCE_INLINE int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxF
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||||
int32_t code = 0;
|
int32_t code = -1;
|
||||||
if (sifValidOp(node->opType) < 0) {
|
if (sifValidOp(node->opType) < 0) {
|
||||||
code = TSDB_CODE_QRY_INVALID_INPUT;
|
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
ctx->code = code;
|
ctx->code = code;
|
||||||
|
@ -654,7 +654,7 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||||
SIFParam *params = NULL;
|
SIFParam *params = NULL;
|
||||||
SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx));
|
SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx));
|
||||||
|
|
||||||
if (params[0].status == SFLT_NOT_INDEX && (nParam > 1 && params[1].status == SFLT_NOT_INDEX)) {
|
if (params[0].status == SFLT_NOT_INDEX || (nParam > 1 && params[1].status == SFLT_NOT_INDEX)) {
|
||||||
output->status = SFLT_NOT_INDEX;
|
output->status = SFLT_NOT_INDEX;
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
@ -664,6 +664,7 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||||
sif_func_t operFn = sifNullFunc;
|
sif_func_t operFn = sifNullFunc;
|
||||||
|
|
||||||
if (!ctx->noExec) {
|
if (!ctx->noExec) {
|
||||||
|
code = 0;
|
||||||
SIF_ERR_JRET(sifGetOperFn(node->opType, &operFn, &output->status));
|
SIF_ERR_JRET(sifGetOperFn(node->opType, &operFn, &output->status));
|
||||||
SIF_ERR_JRET(operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output));
|
SIF_ERR_JRET(operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output));
|
||||||
} else {
|
} else {
|
||||||
|
@ -672,11 +673,17 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||||
output->status = SFLT_NOT_INDEX;
|
output->status = SFLT_NOT_INDEX;
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
code = 0;
|
||||||
SIF_ERR_JRET(sifGetOperFn(node->opType, &operFn, &output->status));
|
SIF_ERR_JRET(sifGetOperFn(node->opType, &operFn, &output->status));
|
||||||
}
|
}
|
||||||
_return:
|
_return:
|
||||||
for (int i = 0; i < nParam; i++) sifFreeParam(¶ms[i]);
|
for (int i = 0; i < nParam; i++) sifFreeParam(¶ms[i]);
|
||||||
taosMemoryFree(params);
|
taosMemoryFree(params);
|
||||||
|
if (code != 0) {
|
||||||
|
output->status = SFLT_NOT_INDEX;
|
||||||
|
} else {
|
||||||
|
output->status = SFLT_COARSE_INDEX;
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -717,7 +724,7 @@ _return:
|
||||||
|
|
||||||
static EDealRes sifWalkFunction(SNode *pNode, void *context) {
|
static EDealRes sifWalkFunction(SNode *pNode, void *context) {
|
||||||
SFunctionNode *node = (SFunctionNode *)pNode;
|
SFunctionNode *node = (SFunctionNode *)pNode;
|
||||||
SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))};
|
SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t)), .status = SFLT_COARSE_INDEX};
|
||||||
|
|
||||||
SIFCtx *ctx = context;
|
SIFCtx *ctx = context;
|
||||||
ctx->code = sifExecFunction(node, ctx, &output);
|
ctx->code = sifExecFunction(node, ctx, &output);
|
||||||
|
@ -735,7 +742,7 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) {
|
||||||
static EDealRes sifWalkLogic(SNode *pNode, void *context) {
|
static EDealRes sifWalkLogic(SNode *pNode, void *context) {
|
||||||
SLogicConditionNode *node = (SLogicConditionNode *)pNode;
|
SLogicConditionNode *node = (SLogicConditionNode *)pNode;
|
||||||
|
|
||||||
SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))};
|
SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t)), .status = SFLT_COARSE_INDEX};
|
||||||
|
|
||||||
SIFCtx *ctx = context;
|
SIFCtx *ctx = context;
|
||||||
ctx->code = sifExecLogic(node, ctx, &output);
|
ctx->code = sifExecLogic(node, ctx, &output);
|
||||||
|
@ -831,6 +838,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
|
||||||
if (res->result != NULL) {
|
if (res->result != NULL) {
|
||||||
taosArrayAddAll(pDst->result, res->result);
|
taosArrayAddAll(pDst->result, res->result);
|
||||||
}
|
}
|
||||||
|
pDst->status = res->status;
|
||||||
|
|
||||||
sifFreeParam(res);
|
sifFreeParam(res);
|
||||||
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
|
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
|
||||||
|
@ -887,16 +895,20 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
|
||||||
SFilterInfo *filter = NULL;
|
SFilterInfo *filter = NULL;
|
||||||
|
|
||||||
SArray *output = taosArrayInit(8, sizeof(uint64_t));
|
SArray *output = taosArrayInit(8, sizeof(uint64_t));
|
||||||
SIFParam param = {.arg = *metaArg, .result = output};
|
SIFParam param = {.arg = *metaArg, .result = output, .status = SFLT_NOT_INDEX};
|
||||||
int32_t code = sifCalculate((SNode *)pFilterNode, ¶m);
|
int32_t code = sifCalculate((SNode *)pFilterNode, ¶m);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sifFreeParam(¶m);
|
sifFreeParam(¶m);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
if (param.status == SFLT_NOT_INDEX) {
|
||||||
|
*status = param.status;
|
||||||
|
} else {
|
||||||
|
*status = st;
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayAddAll(result, param.result);
|
taosArrayAddAll(result, param.result);
|
||||||
sifFreeParam(¶m);
|
sifFreeParam(¶m);
|
||||||
*status = st;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ class TDTestCase:
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
tdLog.debug("start to execute %s" % __file__)
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor(), True)
|
||||||
self.setsql = TDSetSql()
|
self.setsql = TDSetSql()
|
||||||
self.column_dict = {
|
self.column_dict = {
|
||||||
'ts': 'timestamp',
|
'ts': 'timestamp',
|
||||||
|
|
Loading…
Reference in New Issue