Merge pull request #11736 from taosdata/feature/tq
feat(tmq): add db subscribe
This commit is contained in:
commit
af1b5138e6
|
@ -216,7 +216,6 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
|
||||||
|
|
||||||
typedef struct tmq_conf_t tmq_conf_t;
|
typedef struct tmq_conf_t tmq_conf_t;
|
||||||
typedef struct tmq_list_t tmq_list_t;
|
typedef struct tmq_list_t tmq_list_t;
|
||||||
// typedef struct tmq_message_t tmq_message_t;
|
|
||||||
|
|
||||||
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *));
|
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *));
|
||||||
|
|
||||||
|
@ -262,12 +261,6 @@ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const
|
||||||
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
|
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
|
||||||
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
|
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
|
||||||
|
|
||||||
#if 0
|
|
||||||
// temporary used function for demo only
|
|
||||||
void tmqShowMsg(tmq_message_t *tmq_message);
|
|
||||||
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
|
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
|
||||||
|
|
||||||
DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
|
DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
|
||||||
|
@ -278,12 +271,8 @@ DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
|
|
||||||
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
|
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
|
||||||
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
|
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
|
||||||
DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic);
|
|
||||||
DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic);
|
|
||||||
DLL_EXPORT void tmq_message_destroy(TAOS_RES *res);
|
|
||||||
#endif
|
#endif
|
||||||
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
|
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
|
||||||
#if 0
|
#if 0
|
||||||
|
|
|
@ -24,16 +24,6 @@
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
|
|
||||||
#if 0
|
|
||||||
struct tmq_message_t {
|
|
||||||
SMqPollRsp msg;
|
|
||||||
char* topic;
|
|
||||||
SArray* res; // SArray<SReqResultInfo>
|
|
||||||
int32_t vgId;
|
|
||||||
int32_t resIter;
|
|
||||||
};
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t tmqRspType;
|
int8_t tmqRspType;
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
|
@ -770,105 +760,12 @@ _return:
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|
||||||
time_t tt;
|
|
||||||
int32_t ms = 0;
|
|
||||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
|
||||||
tt = (time_t)(val / 1000000000);
|
|
||||||
ms = val % 1000000000;
|
|
||||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
|
||||||
tt = (time_t)(val / 1000000);
|
|
||||||
ms = val % 1000000;
|
|
||||||
} else {
|
|
||||||
tt = (time_t)(val / 1000);
|
|
||||||
ms = val % 1000;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* comment out as it make testcases like select_with_tags.sim fail.
|
|
||||||
but in windows, this may cause the call to localtime crash if tt < 0,
|
|
||||||
need to find a better solution.
|
|
||||||
if (tt < 0) {
|
|
||||||
tt = 0;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifdef WINDOWS
|
|
||||||
if (tt < 0) tt = 0;
|
|
||||||
#endif
|
|
||||||
if (tt <= 0 && ms < 0) {
|
|
||||||
tt--;
|
|
||||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
|
||||||
ms += 1000000000;
|
|
||||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
|
||||||
ms += 1000000;
|
|
||||||
} else {
|
|
||||||
ms += 1000;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct tm* ptm = taosLocalTime(&tt, NULL);
|
|
||||||
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
|
|
||||||
|
|
||||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
|
||||||
sprintf(buf + pos, ".%09d", ms);
|
|
||||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
|
||||||
sprintf(buf + pos, ".%06d", ms);
|
|
||||||
} else {
|
|
||||||
sprintf(buf + pos, ".%03d", ms);
|
|
||||||
}
|
|
||||||
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
#if 0
|
#if 0
|
||||||
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
|
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
|
||||||
if (tmq_message == NULL) return 0;
|
if (tmq_message == NULL) return 0;
|
||||||
SMqPollRsp* pRsp = &tmq_message->msg;
|
SMqPollRsp* pRsp = &tmq_message->msg;
|
||||||
return pRsp->skipLogNum;
|
return pRsp->skipLogNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmqShowMsg(tmq_message_t* tmq_message) {
|
|
||||||
if (tmq_message == NULL) return;
|
|
||||||
|
|
||||||
static bool noPrintSchema;
|
|
||||||
char pBuf[128];
|
|
||||||
SMqPollRsp* pRsp = &tmq_message->msg;
|
|
||||||
int32_t colNum = 2;
|
|
||||||
if (!noPrintSchema) {
|
|
||||||
printf("|");
|
|
||||||
for (int32_t i = 0; i < colNum; i++) {
|
|
||||||
if (i == 0)
|
|
||||||
printf(" %25s |", pRsp->schema->pSchema[i].name);
|
|
||||||
else
|
|
||||||
printf(" %15s |", pRsp->schema->pSchema[i].name);
|
|
||||||
}
|
|
||||||
printf("\n");
|
|
||||||
printf("===============================================\n");
|
|
||||||
noPrintSchema = true;
|
|
||||||
}
|
|
||||||
int32_t sz = taosArrayGetSize(pRsp->pBlockData);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pRsp->pBlockData, i);
|
|
||||||
int32_t rows = pDataBlock->info.rows;
|
|
||||||
for (int32_t j = 0; j < rows; j++) {
|
|
||||||
printf("|");
|
|
||||||
for (int32_t k = 0; k < colNum; k++) {
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
|
||||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
|
||||||
switch (pColInfoData->info.type) {
|
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
|
||||||
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
|
|
||||||
printf(" %25s |", pBuf);
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_INT:
|
|
||||||
case TSDB_DATA_TYPE_UINT:
|
|
||||||
printf(" %15u |", *(uint32_t*)var);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
printf("\n");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
@ -1049,7 +946,6 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
}
|
}
|
||||||
tDeleteSMqCMGetSubEpRsp(&rsp);
|
tDeleteSMqCMGetSubEpRsp(&rsp);
|
||||||
} else {
|
} else {
|
||||||
/*SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));*/
|
|
||||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
|
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
|
||||||
if (pWrapper == NULL) {
|
if (pWrapper == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1208,7 +1104,6 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||||
pRspObj->resIter = -1;
|
pRspObj->resIter = -1;
|
||||||
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
|
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
|
||||||
|
|
||||||
/*SRetrieveTableRsp* pRetrieve = taosArrayGetP(pWrapper->msg.blockData, 0);*/
|
|
||||||
pRspObj->resInfo.totalRows = 0;
|
pRspObj->resInfo.totalRows = 0;
|
||||||
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
||||||
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
||||||
|
@ -1355,31 +1250,6 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
|
|
||||||
tmq_message_t* rspMsg = NULL;
|
|
||||||
int64_t startTime = taosGetTimestampMs();
|
|
||||||
|
|
||||||
int64_t status = atomic_load_64(&tmq->status);
|
|
||||||
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
rspMsg = tmqSyncPollImpl(tmq, blocking_time);
|
|
||||||
if (rspMsg && rspMsg->consumeRsp.numOfTopics) {
|
|
||||||
return rspMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (blocking_time != 0) {
|
|
||||||
int64_t endTime = taosGetTimestampMs();
|
|
||||||
if (endTime - startTime > blocking_time) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
} else
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
SMqRspObj* rspObj;
|
SMqRspObj* rspObj;
|
||||||
int64_t startTime = taosGetTimestampMs();
|
int64_t startTime = taosGetTimestampMs();
|
||||||
|
@ -1417,137 +1287,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
|
// TODO
|
||||||
if (blocking_time <= 0) blocking_time = 1;
|
return TMQ_RESP_ERR__SUCCESS;
|
||||||
if (blocking_time > 1000) blocking_time = 1000;
|
|
||||||
/*blocking_time = 1;*/
|
|
||||||
|
|
||||||
if (taosArrayGetSize(tmq->clientTopics) == 0) {
|
|
||||||
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
|
|
||||||
/*printf("over1\n");*/
|
|
||||||
taosMsleep(blocking_time);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
|
|
||||||
if (taosArrayGetSize(pTopic->vgs) == 0) {
|
|
||||||
/*printf("over2\n");*/
|
|
||||||
taosMsleep(blocking_time);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
|
|
||||||
int32_t beginVgIdx = pTopic->nextVgIdx;
|
|
||||||
while (1) {
|
|
||||||
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
|
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
|
|
||||||
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
|
|
||||||
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg);
|
|
||||||
if (pReq == NULL) {
|
|
||||||
ASSERT(false);
|
|
||||||
taosMsleep(blocking_time);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqPollCbParam* param = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
|
||||||
if (param == NULL) {
|
|
||||||
ASSERT(false);
|
|
||||||
taosMsleep(blocking_time);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
param->tmq = tmq;
|
|
||||||
param->retMsg = &tmq_message;
|
|
||||||
param->pVg = pVg;
|
|
||||||
tsem_init(¶m->rspSem, 0, 0);
|
|
||||||
|
|
||||||
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){
|
|
||||||
.pData = pReq,
|
|
||||||
.len = sizeof(SMqConsumeReq),
|
|
||||||
.handle = NULL,
|
|
||||||
};
|
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
|
||||||
sendInfo->requestObjRefId = 0;
|
|
||||||
sendInfo->param = param;
|
|
||||||
sendInfo->fp = tmqPollCb;
|
|
||||||
|
|
||||||
/*printf("req offset: %ld\n", pReq->offset);*/
|
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
|
||||||
tmq->pollCnt++;
|
|
||||||
|
|
||||||
tsem_wait(¶m->rspSem);
|
|
||||||
tsem_destroy(¶m->rspSem);
|
|
||||||
taosMemoryFree(param);
|
|
||||||
|
|
||||||
if (tmq_message == NULL) {
|
|
||||||
if (beginVgIdx == pTopic->nextVgIdx) {
|
|
||||||
taosMsleep(blocking_time);
|
|
||||||
} else {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return tmq_message;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*tsem_wait(&pRequest->body.rspSem);*/
|
|
||||||
|
|
||||||
/*if (body != NULL) {*/
|
|
||||||
/*destroySendMsgInfo(body);*/
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
/*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
|
|
||||||
/*pRequest->code = terrno;*/
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
/*return pRequest;*/
|
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
|
|
||||||
if (tmq_topic_vgroup_list != NULL) {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: change semaphore to gate
|
|
||||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
|
||||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
|
||||||
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg);
|
|
||||||
|
|
||||||
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
|
|
||||||
SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam));
|
|
||||||
if (pParam == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
pParam->tmq = tmq;
|
|
||||||
pParam->pVg = pVg;
|
|
||||||
pParam->async = async;
|
|
||||||
if (!async) tsem_init(&pParam->rspSem, 0, 0);
|
|
||||||
|
|
||||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
|
||||||
sendInfo->requestObjRefId = 0;
|
|
||||||
sendInfo->param = pParam;
|
|
||||||
sendInfo->fp = tmqCommitCb;
|
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
|
||||||
|
|
||||||
if (!async) tsem_wait(&pParam->rspSem);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
|
|
||||||
|
|
||||||
const char* tmq_err2str(tmq_resp_err_t err) {
|
const char* tmq_err2str(tmq_resp_err_t err) {
|
||||||
if (err == TMQ_RESP_ERR__SUCCESS) {
|
if (err == TMQ_RESP_ERR__SUCCESS) {
|
||||||
|
@ -1573,10 +1316,3 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmq_message_destroy(TAOS_RES* res) {
|
|
||||||
if (res == NULL) return;
|
|
||||||
if (TD_RES_TMQ(res)) {
|
|
||||||
SMqRspObj* pRspObj = (SMqRspObj*)res;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -109,7 +109,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
|
||||||
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
|
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
|
||||||
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
||||||
bool tqNextDataBlock(STqReadHandle *pHandle);
|
bool tqNextDataBlock(STqReadHandle *pHandle);
|
||||||
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows);
|
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows,
|
||||||
|
int16_t *pNumOfCols);
|
||||||
|
|
||||||
// need to reposition
|
// need to reposition
|
||||||
|
|
||||||
|
|
|
@ -88,22 +88,12 @@ struct STqReadHandle {
|
||||||
SSubmitMsgIter msgIter;
|
SSubmitMsgIter msgIter;
|
||||||
SSubmitBlkIter blkIter;
|
SSubmitBlkIter blkIter;
|
||||||
SMeta* pVnodeMeta;
|
SMeta* pVnodeMeta;
|
||||||
SArray* pColIdList; // SArray<int32_t>
|
SArray* pColIdList; // SArray<int16_t>
|
||||||
int32_t sver;
|
int32_t sver;
|
||||||
SSchemaWrapper* pSchemaWrapper;
|
SSchemaWrapper* pSchemaWrapper;
|
||||||
STSchema* pSchema;
|
STSchema* pSchema;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int8_t type;
|
|
||||||
int8_t reserved[7];
|
|
||||||
union {
|
|
||||||
void* data;
|
|
||||||
int64_t wmTs;
|
|
||||||
int64_t checkpointId;
|
|
||||||
};
|
|
||||||
} STqStreamToken;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int16_t ver;
|
int16_t ver;
|
||||||
int16_t action;
|
int16_t action;
|
||||||
|
@ -155,24 +145,26 @@ typedef struct {
|
||||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
|
int8_t subType;
|
||||||
|
int8_t withTbName;
|
||||||
|
int8_t withSchema;
|
||||||
|
int8_t withTag;
|
||||||
|
int8_t withTagSchema;
|
||||||
char* qmsg;
|
char* qmsg;
|
||||||
// SRWLatch lock;
|
// SRWLatch lock;
|
||||||
SWalReadHandle* pReadHandle;
|
SWalReadHandle* pWalReader;
|
||||||
// number should be identical to fetch thread num
|
// number should be identical to fetch thread num
|
||||||
qTaskInfo_t task[4];
|
STqReadHandle* pStreamReader[4];
|
||||||
|
qTaskInfo_t task[4];
|
||||||
} STqExec;
|
} STqExec;
|
||||||
|
|
||||||
struct STQ {
|
struct STQ {
|
||||||
// the collection of groups
|
char* path;
|
||||||
// the handle of meta kvstore
|
// STqMetaStore* tqMeta;
|
||||||
bool writeTrigger;
|
SHashObj* execs; // subKey -> tqExec
|
||||||
char* path;
|
SHashObj* pStreamTasks;
|
||||||
STqMetaStore* tqMeta;
|
SVnode* pVnode;
|
||||||
SHashObj* tqMetaNew; // subKey -> tqExec
|
SWal* pWal;
|
||||||
SHashObj* pStreamTasks;
|
|
||||||
SVnode* pVnode;
|
|
||||||
SWal* pWal;
|
|
||||||
SMeta* pVnodeMeta;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -252,7 +244,7 @@ int tqInit();
|
||||||
void tqCleanUp();
|
void tqCleanUp();
|
||||||
|
|
||||||
// open in each vnode
|
// open in each vnode
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, SMemAllocatorFactory* allocFac);
|
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
|
||||||
void tqClose(STQ*);
|
void tqClose(STQ*);
|
||||||
// required by vnode
|
// required by vnode
|
||||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
|
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
|
||||||
|
|
|
@ -19,7 +19,7 @@ int32_t tqInit() { return tqPushMgrInit(); }
|
||||||
|
|
||||||
void tqCleanUp() { tqPushMgrCleanUp(); }
|
void tqCleanUp() { tqPushMgrCleanUp(); }
|
||||||
|
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMemAllocatorFactory* allocFac) {
|
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||||
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
|
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
|
||||||
if (pTq == NULL) {
|
if (pTq == NULL) {
|
||||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
|
@ -28,15 +28,16 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe
|
||||||
pTq->path = strdup(path);
|
pTq->path = strdup(path);
|
||||||
pTq->pVnode = pVnode;
|
pTq->pVnode = pVnode;
|
||||||
pTq->pWal = pWal;
|
pTq->pWal = pWal;
|
||||||
pTq->pVnodeMeta = pVnodeMeta;
|
#if 0
|
||||||
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
|
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
|
||||||
(FTqDelete)taosMemoryFree, 0);
|
(FTqDelete)taosMemoryFree, 0);
|
||||||
if (pTq->tqMeta == NULL) {
|
if (pTq->tqMeta == NULL) {
|
||||||
taosMemoryFree(pTq);
|
taosMemoryFree(pTq);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
pTq->tqMetaNew = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
@ -104,7 +105,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); }
|
int tqCommit(STQ* pTq) {
|
||||||
|
// do nothing
|
||||||
|
/*return tqStorePersist(pTq->tqMeta);*/
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
|
int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
|
||||||
return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->physicalPlan) + strlen(pTopic->qmsg) +
|
return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->physicalPlan) + strlen(pTopic->qmsg) +
|
||||||
|
@ -219,10 +224,10 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
|
||||||
}
|
}
|
||||||
for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
|
for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
|
||||||
pTopic->buffer.output[j].status = 0;
|
pTopic->buffer.output[j].status = 0;
|
||||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
|
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.reader = pReadHandle,
|
.reader = pReadHandle,
|
||||||
.meta = pTq->pVnodeMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
};
|
};
|
||||||
pTopic->buffer.output[j].pReadHandle = pReadHandle;
|
pTopic->buffer.output[j].pReadHandle = pReadHandle;
|
||||||
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
|
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
|
||||||
|
@ -238,6 +243,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
int32_t reqEpoch = pReq->epoch;
|
int32_t reqEpoch = pReq->epoch;
|
||||||
int64_t fetchOffset;
|
int64_t fetchOffset;
|
||||||
|
|
||||||
|
// get offset to fetch message
|
||||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
||||||
fetchOffset = walGetFirstVer(pTq->pWal);
|
fetchOffset = walGetFirstVer(pTq->pWal);
|
||||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
||||||
|
@ -249,7 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
||||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
||||||
|
|
||||||
STqExec* pExec = taosHashGet(pTq->tqMetaNew, pReq->subKey, strlen(pReq->subKey));
|
STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey));
|
||||||
ASSERT(pExec);
|
ASSERT(pExec);
|
||||||
|
|
||||||
int32_t consumerEpoch = atomic_load_32(&pExec->epoch);
|
int32_t consumerEpoch = atomic_load_32(&pExec->epoch);
|
||||||
|
@ -271,7 +277,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalReadHead* pHead;
|
SWalReadHead* pHead;
|
||||||
if (walReadWithHandle_s(pExec->pReadHandle, fetchOffset, &pHead) < 0) {
|
if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
|
||||||
// TODO: no more log, set timer to wait blocking time
|
// TODO: no more log, set timer to wait blocking time
|
||||||
// if data inserted during waiting, launch query and
|
// if data inserted during waiting, launch query and
|
||||||
// response to user
|
// response to user
|
||||||
|
@ -285,41 +291,73 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
|
|
||||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||||
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
||||||
qTaskInfo_t task = pExec->task[workerId];
|
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
ASSERT(task);
|
qTaskInfo_t task = pExec->task[workerId];
|
||||||
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
ASSERT(task);
|
||||||
while (1) {
|
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||||
SSDataBlock* pDataBlock = NULL;
|
while (1) {
|
||||||
uint64_t ts = 0;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
uint64_t ts = 0;
|
||||||
ASSERT(0);
|
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
if (pDataBlock == NULL) break;
|
||||||
|
|
||||||
|
ASSERT(pDataBlock->info.rows != 0);
|
||||||
|
ASSERT(pDataBlock->info.numOfCols != 0);
|
||||||
|
|
||||||
|
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
|
||||||
|
void* buf = taosMemoryCalloc(1, dataStrLen);
|
||||||
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
||||||
|
pRetrieve->useconds = ts;
|
||||||
|
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
|
||||||
|
pRetrieve->compressed = 0;
|
||||||
|
pRetrieve->completed = 1;
|
||||||
|
pRetrieve->numOfRows = htonl(pDataBlock->info.rows);
|
||||||
|
|
||||||
|
// TODO enable compress
|
||||||
|
int32_t actualLen = 0;
|
||||||
|
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
|
||||||
|
actualLen += sizeof(SRetrieveTableRsp);
|
||||||
|
ASSERT(actualLen <= dataStrLen);
|
||||||
|
taosArrayPush(rsp.blockDataLen, &actualLen);
|
||||||
|
taosArrayPush(rsp.blockData, &buf);
|
||||||
|
rsp.blockNum++;
|
||||||
}
|
}
|
||||||
if (pDataBlock == NULL) break;
|
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
||||||
|
STqReadHandle* pReader = pExec->pStreamReader[workerId];
|
||||||
|
tqReadHandleSetMsg(pReader, pCont, 0);
|
||||||
|
while (tqNextDataBlock(pReader)) {
|
||||||
|
SSDataBlock block = {0};
|
||||||
|
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.rows,
|
||||||
|
&block.info.numOfCols) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
|
||||||
|
void* buf = taosMemoryCalloc(1, dataStrLen);
|
||||||
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
||||||
|
/*pRetrieve->useconds = 0;*/
|
||||||
|
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
|
||||||
|
pRetrieve->compressed = 0;
|
||||||
|
pRetrieve->completed = 1;
|
||||||
|
pRetrieve->numOfRows = htonl(block.info.rows);
|
||||||
|
|
||||||
ASSERT(pDataBlock->info.rows != 0);
|
// TODO enable compress
|
||||||
ASSERT(pDataBlock->info.numOfCols != 0);
|
int32_t actualLen = 0;
|
||||||
|
blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
|
||||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
|
actualLen += sizeof(SRetrieveTableRsp);
|
||||||
void* buf = taosMemoryCalloc(1, dataStrLen);
|
ASSERT(actualLen <= dataStrLen);
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
taosArrayPush(rsp.blockDataLen, &actualLen);
|
||||||
pRetrieve->useconds = ts;
|
taosArrayPush(rsp.blockData, &buf);
|
||||||
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
|
rsp.blockNum++;
|
||||||
pRetrieve->compressed = 0;
|
}
|
||||||
pRetrieve->completed = 1;
|
} else {
|
||||||
pRetrieve->numOfRows = htonl(pDataBlock->info.rows);
|
ASSERT(0);
|
||||||
|
|
||||||
// TODO enable compress
|
|
||||||
int32_t actualLen = 0;
|
|
||||||
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
|
|
||||||
actualLen += sizeof(SRetrieveTableRsp);
|
|
||||||
ASSERT(actualLen <= dataStrLen);
|
|
||||||
taosArrayPush(rsp.blockDataLen, &actualLen);
|
|
||||||
taosArrayPush(rsp.blockData, &buf);
|
|
||||||
rsp.blockNum++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO batch optimization
|
// TODO batch optimization:
|
||||||
|
// TODO continue scan until meeting batch requirement
|
||||||
if (rsp.blockNum != 0) break;
|
if (rsp.blockNum != 0) break;
|
||||||
rsp.skipLogNum++;
|
rsp.skipLogNum++;
|
||||||
fetchOffset++;
|
fetchOffset++;
|
||||||
|
@ -572,10 +610,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
|
|
||||||
// TODO: persist meta into tdb
|
// TODO: persist meta into tdb
|
||||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SMqRebVgReq req;
|
SMqRebVgReq req = {0};
|
||||||
tDecodeSMqRebVgReq(msg, &req);
|
tDecodeSMqRebVgReq(msg, &req);
|
||||||
// todo lock
|
// todo lock
|
||||||
STqExec* pExec = taosHashGet(pTq->tqMetaNew, req.subKey, strlen(req.subKey));
|
STqExec* pExec = taosHashGet(pTq->execs, req.subKey, strlen(req.subKey));
|
||||||
if (pExec == NULL) {
|
if (pExec == NULL) {
|
||||||
ASSERT(req.oldConsumerId == -1);
|
ASSERT(req.oldConsumerId == -1);
|
||||||
ASSERT(req.newConsumerId != -1);
|
ASSERT(req.newConsumerId != -1);
|
||||||
|
@ -586,19 +624,27 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
pExec->consumerId = req.newConsumerId;
|
pExec->consumerId = req.newConsumerId;
|
||||||
pExec->epoch = -1;
|
pExec->epoch = -1;
|
||||||
|
|
||||||
|
pExec->subType = req.subType;
|
||||||
|
pExec->withTbName = req.withTbName;
|
||||||
|
pExec->withSchema = req.withSchema;
|
||||||
|
pExec->withTag = req.withTag;
|
||||||
|
pExec->withTagSchema = req.withTagSchema;
|
||||||
|
|
||||||
pExec->qmsg = req.qmsg;
|
pExec->qmsg = req.qmsg;
|
||||||
req.qmsg = NULL;
|
req.qmsg = NULL;
|
||||||
pExec->pReadHandle = walOpenReadHandle(pTq->pVnode->pWal);
|
|
||||||
|
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||||
for (int32_t i = 0; i < 4; i++) {
|
for (int32_t i = 0; i < 4; i++) {
|
||||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
|
pExec->pStreamReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.reader = pReadHandle,
|
.reader = pExec->pStreamReader[i],
|
||||||
.meta = pTq->pVnodeMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
};
|
};
|
||||||
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
|
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
|
||||||
ASSERT(pExec->task[i]);
|
ASSERT(pExec->task[i]);
|
||||||
}
|
}
|
||||||
taosHashPut(pTq->tqMetaNew, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
|
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
/*if (req.newConsumerId != -1) {*/
|
/*if (req.newConsumerId != -1) {*/
|
||||||
|
@ -627,12 +673,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < parallel; i++) {
|
for (int32_t i = 0; i < parallel; i++) {
|
||||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
|
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.reader = pReadHandle,
|
.reader = pStreamReader,
|
||||||
.meta = pTq->pVnodeMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
};
|
};
|
||||||
pTask->exec.runners[i].inputHandle = pReadHandle;
|
pTask->exec.runners[i].inputHandle = pStreamReader;
|
||||||
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||||
ASSERT(pTask->exec.runners[i].executor);
|
ASSERT(pTask->exec.runners[i].executor);
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,7 +82,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows) {
|
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows,
|
||||||
|
int16_t* pNumOfCols) {
|
||||||
/*int32_t sversion = pHandle->pBlock->sversion;*/
|
/*int32_t sversion = pHandle->pBlock->sversion;*/
|
||||||
// TODO set to real sversion
|
// TODO set to real sversion
|
||||||
int32_t sversion = 0;
|
int32_t sversion = 0;
|
||||||
|
@ -104,7 +105,6 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
|
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
|
||||||
|
|
||||||
*pNumOfRows = pHandle->pBlock->numOfRows;
|
*pNumOfRows = pHandle->pBlock->numOfRows;
|
||||||
/*int32_t numOfCols = pHandle->pSchema->numOfCols;*/
|
|
||||||
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
|
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
|
||||||
|
|
||||||
if (colNumNeed > pSchemaWrapper->nCols) {
|
if (colNumNeed > pSchemaWrapper->nCols) {
|
||||||
|
@ -142,6 +142,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t colActual = taosArrayGetSize(*ppCols);
|
int32_t colActual = taosArrayGetSize(*ppCols);
|
||||||
|
*pNumOfCols = colActual;
|
||||||
|
|
||||||
// TODO in stream shuffle case, fetch groupId
|
// TODO in stream shuffle case, fetch groupId
|
||||||
*pGroupId = 0;
|
*pGroupId = 0;
|
||||||
|
|
|
@ -112,7 +112,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
|
|
||||||
// open tq
|
// open tq
|
||||||
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
|
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
|
||||||
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode));
|
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal);
|
||||||
if (pVnode->pTq == NULL) {
|
if (pVnode->pTq == NULL) {
|
||||||
vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -560,7 +560,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
SArray* pCols = NULL;
|
SArray* pCols = NULL;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows);
|
int16_t outputCol;
|
||||||
|
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows, &outputCol);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
|
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
|
Loading…
Reference in New Issue