refactor(tmq): prepare only needed
This commit is contained in:
parent
bf09787245
commit
d637ffe22c
|
@ -42,25 +42,28 @@ typedef struct SReadHandle {
|
|||
bool initTqReader;
|
||||
} SReadHandle;
|
||||
|
||||
// in queue mode, data streams are seperated by msg
|
||||
typedef enum {
|
||||
OPTR_EXEC_MODEL_BATCH = 0x1,
|
||||
OPTR_EXEC_MODEL_STREAM = 0x2,
|
||||
OPTR_EXEC_MODEL_QUEUE = 0x3,
|
||||
} EOPTR_EXEC_MODEL;
|
||||
|
||||
/**
|
||||
* Create the exec task for streaming mode
|
||||
* Create the exec task for stream mode
|
||||
* @param pMsg
|
||||
* @param streamReadHandle
|
||||
* @param SReadHandle
|
||||
* @return
|
||||
*/
|
||||
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
|
||||
|
||||
/**
|
||||
* Switch the stream scan to snapshot mode
|
||||
* @param tinfo
|
||||
* Create the exec task for queue mode
|
||||
* @param pMsg
|
||||
* @param SReadHandle
|
||||
* @return
|
||||
*/
|
||||
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo);
|
||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers);
|
||||
|
||||
/**
|
||||
* Set the input data block for the stream scan.
|
||||
|
@ -111,7 +114,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
|||
* @return
|
||||
*/
|
||||
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
|
||||
int32_t* tversion);
|
||||
int32_t* tversion);
|
||||
|
||||
/**
|
||||
* The main task execution function, including query on both table and multiple tables,
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "query.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tmsg.h"
|
||||
#include "tmsgcb.h"
|
||||
|
@ -119,6 +120,7 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue
|
|||
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
|
||||
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
|
||||
if (dequeueFlag == STREAM_QUEUE__FAILED) {
|
||||
ASSERT(0);
|
||||
ASSERT(queue->qItem != NULL);
|
||||
return streamQueueCurItem(queue);
|
||||
} else {
|
||||
|
@ -305,6 +307,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
|
|||
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
|
||||
return -1;
|
||||
}
|
||||
qInfo("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
|
||||
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
|
||||
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||
|
|
|
@ -94,7 +94,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
|
|||
#define pError(...) { taosPrintLog("APP ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }
|
||||
#define pPrint(...) { taosPrintLog("APP ", DEBUG_INFO, 255, __VA_ARGS__); }
|
||||
// clang-format on
|
||||
#define BUF_PAGE_DEBUG
|
||||
//#define BUF_PAGE_DEBUG
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1149,11 +1149,10 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
|
||||
tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
|
||||
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
/*tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);*/
|
||||
} else {
|
||||
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
|
||||
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
|
||||
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||
}
|
||||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
|
@ -2427,15 +2426,15 @@ static void destroyCreateTbReqBatch(void* data) {
|
|||
taosArrayDestroy(pTbBatch->req.pArray);
|
||||
}
|
||||
|
||||
static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
|
||||
SVCreateTbBatchReq req = {0};
|
||||
SDecoder coder = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SRequestObj *pRequest = NULL;
|
||||
SQuery *pQuery = NULL;
|
||||
SHashObj *pVgroupHashmap = NULL;
|
||||
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||
SVCreateTbBatchReq req = {0};
|
||||
SDecoder coder = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SRequestObj* pRequest = NULL;
|
||||
SQuery* pQuery = NULL;
|
||||
SHashObj* pVgroupHashmap = NULL;
|
||||
|
||||
code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest);
|
||||
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -2455,8 +2454,8 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
|
|||
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
|
||||
SVCreateTbReq *pCreateReq = NULL;
|
||||
SCatalog* pCatalog = NULL;
|
||||
SVCreateTbReq* pCreateReq = NULL;
|
||||
SCatalog* pCatalog = NULL;
|
||||
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
|
@ -2540,13 +2539,13 @@ static void destroyDropTbReqBatch(void* data) {
|
|||
taosArrayDestroy(pTbBatch->req.pArray);
|
||||
}
|
||||
|
||||
static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
|
||||
SVDropTbBatchReq req = {0};
|
||||
SDecoder coder = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SRequestObj *pRequest = NULL;
|
||||
SQuery *pQuery = NULL;
|
||||
SHashObj *pVgroupHashmap = NULL;
|
||||
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||
SVDropTbBatchReq req = {0};
|
||||
SDecoder coder = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SRequestObj* pRequest = NULL;
|
||||
SQuery* pQuery = NULL;
|
||||
SHashObj* pVgroupHashmap = NULL;
|
||||
|
||||
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2568,8 +2567,8 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
|
|||
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
|
||||
SVDropTbReq *pDropReq = NULL;
|
||||
SCatalog *pCatalog = NULL;
|
||||
SVDropTbReq* pDropReq = NULL;
|
||||
SCatalog* pCatalog = NULL;
|
||||
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
|
@ -2640,17 +2639,16 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
|
||||
SVAlterTbReq req = {0};
|
||||
SDecoder coder = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SRequestObj *pRequest = NULL;
|
||||
SQuery *pQuery = NULL;
|
||||
SArray *pArray = NULL;
|
||||
SVgDataBlocks *pVgData = NULL;
|
||||
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||
SVAlterTbReq req = {0};
|
||||
SDecoder coder = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SRequestObj* pRequest = NULL;
|
||||
SQuery* pQuery = NULL;
|
||||
SArray* pArray = NULL;
|
||||
SVgDataBlocks* pVgData = NULL;
|
||||
|
||||
|
||||
code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest);
|
||||
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
|
|
@ -1736,56 +1736,57 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t len = 0;
|
||||
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id:%" PRIu64 "|\n", flag,
|
||||
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId);
|
||||
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld\n", flag,
|
||||
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
|
||||
pDataBlock->info.uid);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
len += snprintf(dumpBuf + len, size - len, "%s |", flag);
|
||||
if (len >= size -1) return dumpBuf;
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
|
||||
for (int32_t k = 0; k < colNum; k++) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||
if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
|
||||
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
|
||||
if (len >= size -1) return dumpBuf;
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
continue;
|
||||
}
|
||||
switch (pColInfoData->info.type) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
|
||||
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
|
||||
if (len >= size -1) return dumpBuf;
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
|
||||
if (len >= size -1) return dumpBuf;
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
|
||||
if (len >= size -1) return dumpBuf;
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
len += snprintf(dumpBuf + len, size - len, " %15ld |", *(int64_t*)var);
|
||||
if (len >= size -1) return dumpBuf;
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
len += snprintf(dumpBuf + len, size - len, " %15lu |", *(uint64_t*)var);
|
||||
if (len >= size -1) return dumpBuf;
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
|
||||
if (len >= size -1) return dumpBuf;
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
|
||||
if (len >= size -1) return dumpBuf;
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
}
|
||||
}
|
||||
len += snprintf(dumpBuf + len, size - len, "\n");
|
||||
if (len >= size -1) return dumpBuf;
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
}
|
||||
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
|
||||
return dumpBuf;
|
||||
|
|
|
@ -412,7 +412,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 1, 1);
|
||||
tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 4, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeWriteThreads = tsNumOfCores;
|
||||
|
|
|
@ -5476,6 +5476,11 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
|||
ASSERT(0);
|
||||
// TODO
|
||||
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/
|
||||
/*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/
|
||||
/*return true;*/
|
||||
}
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -89,8 +89,6 @@ typedef struct {
|
|||
STqExecTb execTb;
|
||||
STqExecDb execDb;
|
||||
};
|
||||
// TODO remove it
|
||||
int64_t tsdbEndVer;
|
||||
|
||||
} STqExecHandle;
|
||||
|
||||
|
@ -101,6 +99,8 @@ typedef struct {
|
|||
int32_t epoch;
|
||||
int8_t fetchMeta;
|
||||
|
||||
int64_t snapshotVer;
|
||||
|
||||
// TODO remove
|
||||
SWalReader* pWalReader;
|
||||
|
||||
|
@ -131,7 +131,7 @@ typedef struct {
|
|||
static STqMgmt tqMgmt = {0};
|
||||
|
||||
// tqRead
|
||||
int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* offset);
|
||||
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* offset);
|
||||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
||||
|
||||
// tqExec
|
||||
|
|
|
@ -321,7 +321,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
|
||||
fetchOffsetNew.version++;
|
||||
}
|
||||
if (tqScan(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
|
||||
if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
|
||||
ASSERT(0);
|
||||
code = -1;
|
||||
goto OVER;
|
||||
|
@ -480,30 +480,28 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
pHandle->fetchMeta = req.withMeta;
|
||||
|
||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
/*for (int32_t i = 0; i < 5; i++) {*/
|
||||
/*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
|
||||
/*}*/
|
||||
|
||||
// TODO version should be assigned in preprocess
|
||||
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
pHandle->execHandle.execCol.qmsg = req.qmsg;
|
||||
pHandle->snapshotVer = ver;
|
||||
req.qmsg = NULL;
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
SReadHandle handle = {
|
||||
.tqReader = pHandle->execHandle.pExecReader[i],
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.vnode = pTq->pVnode,
|
||||
.initTableReader = true,
|
||||
.initTqReader = true,
|
||||
.version = ver,
|
||||
};
|
||||
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
||||
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
||||
ASSERT(pHandle->execHandle.execCol.task[i]);
|
||||
void* scanner = NULL;
|
||||
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner);
|
||||
ASSERT(scanner);
|
||||
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner);
|
||||
ASSERT(pHandle->execHandle.pExecReader[i]);
|
||||
pHandle->execHandle.tsdbEndVer = ver;
|
||||
}
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
|
|
|
@ -59,8 +59,9 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
||||
qTaskInfo_t task = pExec->execCol.task[0];
|
||||
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
|
||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||
qTaskInfo_t task = pExec->execCol.task[0];
|
||||
|
||||
if (qStreamPrepareScan(task, pOffset) < 0) {
|
||||
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
|
||||
|
@ -73,9 +74,11 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
|
|||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
tqDebug("task start to execute");
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
tqDebug("task execute end, get %p", pDataBlock);
|
||||
|
||||
if (pDataBlock != NULL) {
|
||||
tqAddBlockDataToRsp(pDataBlock, pRsp);
|
||||
|
@ -97,7 +100,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
|
|||
}
|
||||
|
||||
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
tqOffsetResetToLog(pOffset, pExec->tsdbEndVer + 1);
|
||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer + 1);
|
||||
qStreamPrepareScan(task, pOffset);
|
||||
continue;
|
||||
}
|
||||
|
@ -116,7 +119,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
|
|||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
|
||||
}
|
||||
|
||||
tqDebug("task exec exited");
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ static int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
|
|||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
|
@ -32,6 +33,7 @@ static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
|||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
|
@ -78,19 +80,25 @@ int32_t tqMetaOpen(STQ* pTq) {
|
|||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||
tDecodeSTqHandle(&decoder, &handle);
|
||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
|
||||
}
|
||||
/*for (int32_t i = 0; i < 5; i++) {*/
|
||||
/*handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
|
||||
/*}*/
|
||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
SReadHandle reader = {
|
||||
.tqReader = handle.execHandle.pExecReader[i],
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
.vnode = pTq->pVnode,
|
||||
.initTableReader = true,
|
||||
.initTqReader = true,
|
||||
.version = handle.snapshotVer,
|
||||
};
|
||||
handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader);
|
||||
handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader);
|
||||
ASSERT(handle.execHandle.execCol.task[i]);
|
||||
void* scanner = NULL;
|
||||
qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner);
|
||||
ASSERT(scanner);
|
||||
handle.execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner);
|
||||
ASSERT(handle.execHandle.pExecReader[i]);
|
||||
}
|
||||
} else {
|
||||
handle.execHandle.execDb.pFilterOutTbUid =
|
||||
|
|
|
@ -106,6 +106,30 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
|
|||
return code;
|
||||
}
|
||||
|
||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers) {
|
||||
if (msg == NULL) {
|
||||
// TODO create raw scan
|
||||
return NULL;
|
||||
}
|
||||
|
||||
struct SSubplan* plan = NULL;
|
||||
int32_t code = qStringToSubplan(msg, &plan);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
qTaskInfo_t pTaskInfo = NULL;
|
||||
code = qCreateExecTask(readers, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// TODO: destroy SSubplan & pTaskInfo
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pTaskInfo;
|
||||
}
|
||||
|
||||
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
|
||||
if (msg == NULL) {
|
||||
return NULL;
|
||||
|
@ -186,7 +210,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
}
|
||||
|
||||
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
|
||||
int32_t* tversion) {
|
||||
int32_t* tversion) {
|
||||
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
|
||||
|
|
|
@ -269,13 +269,13 @@ const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
|
|||
|
||||
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||
return pTaskInfo->streamInfo.metaBlk;
|
||||
}
|
||||
|
||||
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||
memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
|
||||
return 0;
|
||||
}
|
||||
|
@ -283,70 +283,70 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
|||
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||
pTaskInfo->streamInfo.prepareStatus = *pOffset;
|
||||
// TODO: optimize
|
||||
/*if (pTaskInfo->streamInfo.lastStatus.type != pOffset->type ||*/
|
||||
/*pTaskInfo->streamInfo.prepareStatus.version != pTaskInfo->streamInfo.lastStatus.version) {*/
|
||||
while (1) {
|
||||
uint8_t type = pOperator->operatorType;
|
||||
pOperator->status = OP_OPENED;
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) {
|
||||
return -1;
|
||||
}
|
||||
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version);
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
|
||||
int64_t uid = pOffset->uid;
|
||||
int64_t ts = pOffset->ts;
|
||||
|
||||
if (uid == 0) {
|
||||
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
|
||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
|
||||
uid = pTableInfo->uid;
|
||||
ts = INT64_MIN;
|
||||
if (pTaskInfo->streamInfo.lastStatus.type != pOffset->type ||
|
||||
pTaskInfo->streamInfo.prepareStatus.version != pTaskInfo->streamInfo.lastStatus.version) {
|
||||
while (1) {
|
||||
uint8_t type = pOperator->operatorType;
|
||||
pOperator->status = OP_OPENED;
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||
|
||||
pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||
bool found = false;
|
||||
for (int32_t i = 0; i < tableSz; i++) {
|
||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
|
||||
if (pTableInfo->uid == uid) {
|
||||
found = true;
|
||||
pTableScanInfo->currentTable = i;
|
||||
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version);
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
|
||||
int64_t uid = pOffset->uid;
|
||||
int64_t ts = pOffset->ts;
|
||||
|
||||
if (uid == 0) {
|
||||
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
|
||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
|
||||
uid = pTableInfo->uid;
|
||||
ts = INT64_MIN;
|
||||
}
|
||||
}
|
||||
if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||
|
||||
pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||
bool found = false;
|
||||
for (int32_t i = 0; i < tableSz; i++) {
|
||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
|
||||
if (pTableInfo->uid == uid) {
|
||||
found = true;
|
||||
pTableScanInfo->currentTable = i;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO after dropping table, table may be not found
|
||||
ASSERT(found);
|
||||
// TODO after dropping table, table may be not found
|
||||
ASSERT(found);
|
||||
|
||||
tsdbSetTableId(pTableScanInfo->dataReader, uid);
|
||||
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
|
||||
pTableScanInfo->cond.twindows.skey = ts + 1;
|
||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||
pTableScanInfo->cond.twindows.skey = oldSkey;
|
||||
pTableScanInfo->scanTimes = 0;
|
||||
tsdbSetTableId(pTableScanInfo->dataReader, uid);
|
||||
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
|
||||
pTableScanInfo->cond.twindows.skey = ts + 1;
|
||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||
pTableScanInfo->cond.twindows.skey = oldSkey;
|
||||
pTableScanInfo->scanTimes = 0;
|
||||
|
||||
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
||||
pTableScanInfo->currentTable, tableSz);
|
||||
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
||||
pTableScanInfo->currentTable, tableSz);
|
||||
}
|
||||
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
ASSERT(pOperator->numOfDownstream == 1);
|
||||
pOperator = pOperator->pDownstream[0];
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
ASSERT(pOperator->numOfDownstream == 1);
|
||||
pOperator = pOperator->pDownstream[0];
|
||||
}
|
||||
}
|
||||
/*}*/
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -576,14 +576,15 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
int32_t numOfRows = 0;
|
||||
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||
SqlFunctionCtx* pfCtx = &pCtx[k];
|
||||
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||
SqlFunctionCtx* pfCtx = &pCtx[k];
|
||||
SInputColumnInfoData* pInputData = &pfCtx->input;
|
||||
|
||||
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||
if (pResult->info.rows > 0 && !createNewColModel) {
|
||||
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows);
|
||||
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0],
|
||||
pInputData->numOfRows);
|
||||
} else {
|
||||
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
|
||||
}
|
||||
|
@ -641,11 +642,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
} else if (fmIsAggFunc(pfCtx->functionId)) {
|
||||
// _group_key function for "partition by tbname" + csum(col_name) query
|
||||
SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||
int32_t slotId = pfCtx->param[0].pCol->slotId;
|
||||
int32_t slotId = pfCtx->param[0].pCol->slotId;
|
||||
|
||||
// todo handle the json tag
|
||||
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
|
||||
for(int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
|
||||
for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
|
||||
bool isNull = colDataIsNull_s(pInput, f);
|
||||
if (isNull) {
|
||||
colDataAppendNULL(pOutput, pResult->info.rows + f);
|
||||
|
@ -3250,6 +3251,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
|
||||
pOperator->status = OP_OPENED;
|
||||
return NULL;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -3283,11 +3288,15 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
|
||||
while (1) {
|
||||
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
||||
qDebug("projection call next");
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
// TODO optimize
|
||||
/*if (pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {*/
|
||||
qDebug("projection get null");
|
||||
|
||||
/*if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH) {*/
|
||||
doSetOperatorCompleted(pOperator);
|
||||
/*} else if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {*/
|
||||
/*pOperator->status = OP_RES_TO_RETURN;*/
|
||||
/*}*/
|
||||
break;
|
||||
}
|
||||
|
@ -3819,7 +3828,8 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo) {
|
||||
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
int32_t order = 0;
|
||||
int32_t scanFlag = 0;
|
||||
|
||||
|
@ -3874,9 +3884,9 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
|||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
while(1) {
|
||||
while (1) {
|
||||
// here we need to handle the existsed group results
|
||||
if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
|
||||
if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
|
||||
for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
|
||||
SqlFunctionCtx* pCtx = &pSup->pCtx[k];
|
||||
|
||||
|
@ -3974,15 +3984,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
|||
|
||||
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pCondition = pPhyNode->node.pConditions;
|
||||
pInfo->pPseudoColInfo= setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pCondition = pPhyNode->node.pConditions;
|
||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||
|
||||
pOperator->name = "IndefinitOperator";
|
||||
pOperator->name = "IndefinitOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
||||
|
@ -4047,8 +4057,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
SArray* pColMatchColInfo =
|
||||
extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc,
|
||||
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
|
||||
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
|
||||
|
@ -4056,18 +4066,18 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->multigroupResult = multigroupResult;
|
||||
pInfo->pCondition = pPhyFillNode->node.pConditions;
|
||||
pInfo->pColMatchColInfo = pColMatchColInfo;
|
||||
pOperator->name = "FillOperator";
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->multigroupResult = multigroupResult;
|
||||
pInfo->pCondition = pPhyFillNode->node.pConditions;
|
||||
pInfo->pColMatchColInfo = pColMatchColInfo;
|
||||
pOperator->name = "FillOperator";
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = num;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
||||
|
|
|
@ -1208,6 +1208,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
/*return NULL;*/
|
||||
/*}*/
|
||||
|
||||
qDebug("stream scan called");
|
||||
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
|
||||
while (1) {
|
||||
SFetchRet ret = {0};
|
||||
|
@ -1229,6 +1230,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
} else if (ret.fetchType == FETCH_TYPE__NONE) {
|
||||
pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version);
|
||||
qDebug("stream scan return null");
|
||||
return NULL;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
|
@ -1257,6 +1259,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
// TODO move into scan
|
||||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
switch (pBlock->info.type) {
|
||||
case STREAM_NORMAL:
|
||||
case STREAM_GET_ALL:
|
||||
return pBlock;
|
||||
case STREAM_RETRIEVE: {
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
|
||||
|
@ -1286,6 +1291,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
return pBlock;
|
||||
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
qInfo("scan mode %d", pInfo->scanMode);
|
||||
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
|
||||
blockDataDestroy(pInfo->pUpdateRes);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
|
@ -1380,7 +1386,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
qInfo("scan rows: %d", pBlockInfo->rows);
|
||||
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
||||
|
||||
#if 0
|
||||
|
|
|
@ -1332,13 +1332,13 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
|||
if (chIds && pPullDataMap) {
|
||||
SArray* chAy = *(SArray**)chIds;
|
||||
int32_t size = taosArrayGetSize(chAy);
|
||||
qInfo("window %" PRId64 " wait child size:%d", win.skey, size);
|
||||
qDebug("window %" PRId64 " wait child size:%d", win.skey, size);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
qInfo("window %" PRId64 " wait chid id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
|
||||
qDebug("window %" PRId64 " wait chid id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
|
||||
}
|
||||
continue;
|
||||
} else if (pPullDataMap) {
|
||||
qInfo("close window %" PRId64, win.skey);
|
||||
qDebug("close window %" PRId64, win.skey);
|
||||
}
|
||||
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
|
||||
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
|
@ -2491,8 +2491,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
|||
if (IS_FINAL_OP(pInfo)) {
|
||||
forwardRows = 1;
|
||||
} else {
|
||||
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
|
||||
TSDB_ORDER_ASC);
|
||||
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
|
||||
NULL, TSDB_ORDER_ASC);
|
||||
}
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
|
||||
saveResultRow(pResult, tableGroupId, pUpdated);
|
||||
|
@ -2609,6 +2609,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
|
@ -2659,7 +2661,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
clearSpecialDataBlock(pInfo->pUpdateRes);
|
||||
removeDeleteResults(pUpdated, pInfo->pDelWins);
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
qInfo("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||
break;
|
||||
}
|
||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv");
|
||||
|
|
|
@ -173,7 +173,8 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
|||
}
|
||||
|
||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||
qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->upstreamTaskId);
|
||||
qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId,
|
||||
pReq->upstreamTaskId);
|
||||
|
||||
// 1. handle input
|
||||
streamTaskEnqueue(pTask, pReq, pRsp);
|
||||
|
@ -208,7 +209,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
|
||||
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
|
||||
|
||||
qDebug("task %d receive dispatch rsp", pTask->taskId);
|
||||
qInfo("task %d receive dispatch rsp", pTask->taskId);
|
||||
|
||||
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
||||
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
||||
|
|
|
@ -303,7 +303,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
}
|
||||
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
|
||||
|
||||
qDebug("stream continue dispatching: task %d", pTask->taskId);
|
||||
qInfo("stream continue dispatching: task %d", pTask->taskId);
|
||||
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
|
|
|
@ -26,10 +26,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
|||
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
ASSERT(pTask->isDataScan);
|
||||
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
|
||||
qInfo("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
|
||||
qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false);
|
||||
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
|
||||
SArray* blocks = pBlock->blocks;
|
||||
qInfo("task %d %p set ssdata input", pTask->taskId, pTask);
|
||||
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false);
|
||||
} else if (pItem->type == STREAM_INPUT__DROP) {
|
||||
// TODO exec drop
|
||||
|
|
|
@ -66,6 +66,7 @@ void walCloseReader(SWalReader *pRead) {
|
|||
}
|
||||
|
||||
int32_t walNextValidMsg(SWalReader *pRead) {
|
||||
wDebug("vgId:%d wal start to fetch", pRead->pWal->cfg.vgId);
|
||||
int64_t fetchVer = pRead->curVersion;
|
||||
int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal);
|
||||
while (fetchVer <= endVer) {
|
||||
|
@ -176,7 +177,7 @@ int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver);
|
||||
wDebug("wal version reset from %ld(invalid: %d) to %ld", pRead->curVersion, pRead->curInvalid, ver);
|
||||
|
||||
pRead->curVersion = ver;
|
||||
return 0;
|
||||
|
@ -242,6 +243,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
pRead->curInvalid = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -301,6 +303,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
|
|||
int64_t code;
|
||||
|
||||
ASSERT(pRead->curVersion == pRead->pHead->head.version);
|
||||
ASSERT(pRead->curInvalid == 0);
|
||||
|
||||
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
||||
if (code < 0) {
|
||||
|
@ -404,6 +407,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
|||
}
|
||||
|
||||
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||
wDebug("vgId:%d wal start to read ver %ld", pRead->pWal->cfg.vgId, ver);
|
||||
int64_t contLen;
|
||||
bool seeked = false;
|
||||
|
||||
|
|
Loading…
Reference in New Issue