diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 783193db49..630e983f81 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -42,25 +42,28 @@ typedef struct SReadHandle { bool initTqReader; } SReadHandle; +// in queue mode, data streams are seperated by msg typedef enum { OPTR_EXEC_MODEL_BATCH = 0x1, OPTR_EXEC_MODEL_STREAM = 0x2, + OPTR_EXEC_MODEL_QUEUE = 0x3, } EOPTR_EXEC_MODEL; /** - * Create the exec task for streaming mode + * Create the exec task for stream mode * @param pMsg - * @param streamReadHandle + * @param SReadHandle * @return */ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); /** - * Switch the stream scan to snapshot mode - * @param tinfo + * Create the exec task for queue mode + * @param pMsg + * @param SReadHandle * @return */ -int32_t qStreamScanSnapshot(qTaskInfo_t tinfo); +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers); /** * Set the input data block for the stream scan. @@ -111,7 +114,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, * @return */ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, - int32_t* tversion); + int32_t* tversion); /** * The main task execution function, including query on both table and multiple tables, diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d6cb2c27b0..ac9784b91b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -14,6 +14,7 @@ */ #include "os.h" +#include "query.h" #include "tdatablock.h" #include "tmsg.h" #include "tmsgcb.h" @@ -119,6 +120,7 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) { int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING); if (dequeueFlag == STREAM_QUEUE__FAILED) { + ASSERT(0); ASSERT(queue->qItem != NULL); return streamQueueCurItem(queue); } else { @@ -305,6 +307,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); return -1; } + qInfo("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data); taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { taosWriteQitem(pTask->inputQueue->queue, pItem); diff --git a/include/util/tlog.h b/include/util/tlog.h index a8c9eeabde..d186c32841 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -94,7 +94,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons #define pError(...) { taosPrintLog("APP ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); } #define pPrint(...) { taosPrintLog("APP ", DEBUG_INFO, 255, __VA_ARGS__); } // clang-format on -#define BUF_PAGE_DEBUG +//#define BUF_PAGE_DEBUG #ifdef __cplusplus } #endif diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 110b839216..b0542e350f 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1149,11 +1149,10 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); - /*tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);*/ } else { ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); - memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp); + memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); } taosMemoryFree(pMsg->pData); @@ -2427,15 +2426,15 @@ static void destroyCreateTbReqBatch(void* data) { taosArrayDestroy(pTbBatch->req.pArray); } -static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){ - SVCreateTbBatchReq req = {0}; - SDecoder coder = {0}; - int32_t code = TSDB_CODE_SUCCESS; - SRequestObj *pRequest = NULL; - SQuery *pQuery = NULL; - SHashObj *pVgroupHashmap = NULL; +static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { + SVCreateTbBatchReq req = {0}; + SDecoder coder = {0}; + int32_t code = TSDB_CODE_SUCCESS; + SRequestObj* pRequest = NULL; + SQuery* pQuery = NULL; + SHashObj* pVgroupHashmap = NULL; - code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest); + code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest); if (code != TSDB_CODE_SUCCESS) { goto end; } @@ -2455,8 +2454,8 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){ STscObj* pTscObj = pRequest->pTscObj; - SVCreateTbReq *pCreateReq = NULL; - SCatalog* pCatalog = NULL; + SVCreateTbReq* pCreateReq = NULL; + SCatalog* pCatalog = NULL; code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { goto end; @@ -2540,13 +2539,13 @@ static void destroyDropTbReqBatch(void* data) { taosArrayDestroy(pTbBatch->req.pArray); } -static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){ - SVDropTbBatchReq req = {0}; - SDecoder coder = {0}; - int32_t code = TSDB_CODE_SUCCESS; - SRequestObj *pRequest = NULL; - SQuery *pQuery = NULL; - SHashObj *pVgroupHashmap = NULL; +static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { + SVDropTbBatchReq req = {0}; + SDecoder coder = {0}; + int32_t code = TSDB_CODE_SUCCESS; + SRequestObj* pRequest = NULL; + SQuery* pQuery = NULL; + SHashObj* pVgroupHashmap = NULL; code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest); if (code != TSDB_CODE_SUCCESS) { @@ -2568,8 +2567,8 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){ STscObj* pTscObj = pRequest->pTscObj; - SVDropTbReq *pDropReq = NULL; - SCatalog *pCatalog = NULL; + SVDropTbReq* pDropReq = NULL; + SCatalog* pCatalog = NULL; code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { goto end; @@ -2640,17 +2639,16 @@ end: return code; } -static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){ - SVAlterTbReq req = {0}; - SDecoder coder = {0}; - int32_t code = TSDB_CODE_SUCCESS; - SRequestObj *pRequest = NULL; - SQuery *pQuery = NULL; - SArray *pArray = NULL; - SVgDataBlocks *pVgData = NULL; +static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { + SVAlterTbReq req = {0}; + SDecoder coder = {0}; + int32_t code = TSDB_CODE_SUCCESS; + SRequestObj* pRequest = NULL; + SQuery* pQuery = NULL; + SArray* pArray = NULL; + SVgDataBlocks* pVgData = NULL; - - code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest); + code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest); if (code != TSDB_CODE_SUCCESS) { goto end; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 38f46b9b11..52cb590ecc 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1736,56 +1736,57 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; int32_t len = 0; - len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id:%" PRIu64 "|\n", flag, - (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId); + len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld\n", flag, + (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId, + pDataBlock->info.uid); if (len >= size - 1) return dumpBuf; for (int32_t j = 0; j < rows; j++) { len += snprintf(dumpBuf + len, size - len, "%s |", flag); - if (len >= size -1) return dumpBuf; + if (len >= size - 1) return dumpBuf; for (int32_t k = 0; k < colNum; k++) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) { len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL"); - if (len >= size -1) return dumpBuf; + if (len >= size - 1) return dumpBuf; continue; } switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf); - if (len >= size -1) return dumpBuf; + if (len >= size - 1) return dumpBuf; break; case TSDB_DATA_TYPE_INT: len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var); - if (len >= size -1) return dumpBuf; + if (len >= size - 1) return dumpBuf; break; case TSDB_DATA_TYPE_UINT: len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var); - if (len >= size -1) return dumpBuf; + if (len >= size - 1) return dumpBuf; break; case TSDB_DATA_TYPE_BIGINT: len += snprintf(dumpBuf + len, size - len, " %15ld |", *(int64_t*)var); - if (len >= size -1) return dumpBuf; + if (len >= size - 1) return dumpBuf; break; case TSDB_DATA_TYPE_UBIGINT: len += snprintf(dumpBuf + len, size - len, " %15lu |", *(uint64_t*)var); - if (len >= size -1) return dumpBuf; + if (len >= size - 1) return dumpBuf; break; case TSDB_DATA_TYPE_FLOAT: len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var); - if (len >= size -1) return dumpBuf; + if (len >= size - 1) return dumpBuf; break; case TSDB_DATA_TYPE_DOUBLE: len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var); - if (len >= size -1) return dumpBuf; + if (len >= size - 1) return dumpBuf; break; } } len += snprintf(dumpBuf + len, size - len, "\n"); - if (len >= size -1) return dumpBuf; + if (len >= size - 1) return dumpBuf; } len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag); return dumpBuf; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f19d17d034..53476a6a23 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -412,7 +412,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1; - tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 1, 1); + tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 4, 4); if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e08aa91459..3bc954586c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5476,6 +5476,11 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { ASSERT(0); // TODO return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; + } else { + ASSERT(0); + /*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/ + /*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/ + /*return true;*/ } } return false; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 8abaac6dff..c62b7e95bf 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -89,8 +89,6 @@ typedef struct { STqExecTb execTb; STqExecDb execDb; }; - // TODO remove it - int64_t tsdbEndVer; } STqExecHandle; @@ -101,6 +99,8 @@ typedef struct { int32_t epoch; int8_t fetchMeta; + int64_t snapshotVer; + // TODO remove SWalReader* pWalReader; @@ -131,7 +131,7 @@ typedef struct { static STqMgmt tqMgmt = {0}; // tqRead -int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* offset); +int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* offset); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); // tqExec diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fbb972fafe..621df3edd5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -321,7 +321,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (fetchOffsetNew.type == TMQ_OFFSET__LOG) { fetchOffsetNew.version++; } - if (tqScan(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) { + if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) { ASSERT(0); code = -1; goto OVER; @@ -480,30 +480,28 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->fetchMeta = req.withMeta; pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - /*for (int32_t i = 0; i < 5; i++) {*/ - /*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/ - /*}*/ + + // TODO version should be assigned in preprocess int64_t ver = walGetCommittedVer(pTq->pVnode->pWal); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { pHandle->execHandle.execCol.qmsg = req.qmsg; + pHandle->snapshotVer = ver; req.qmsg = NULL; for (int32_t i = 0; i < 5; i++) { SReadHandle handle = { - .tqReader = pHandle->execHandle.pExecReader[i], .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTableReader = true, .initTqReader = true, .version = ver, }; - pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); + pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); ASSERT(pHandle->execHandle.execCol.task[i]); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); ASSERT(scanner); pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); ASSERT(pHandle->execHandle.pExecReader[i]); - pHandle->execHandle.tsdbEndVer = ver; } } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { for (int32_t i = 0; i < 5; i++) { diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 54e46e7b9a..ae5499af11 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -59,8 +59,9 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { return 0; } -int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { - qTaskInfo_t task = pExec->execCol.task[0]; +int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { + const STqExecHandle* pExec = &pHandle->execHandle; + qTaskInfo_t task = pExec->execCol.task[0]; if (qStreamPrepareScan(task, pOffset) < 0) { ASSERT(pOffset->type == TMQ_OFFSET__LOG); @@ -73,9 +74,11 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; + tqDebug("task start to execute"); if (qExecTask(task, &pDataBlock, &ts) < 0) { ASSERT(0); } + tqDebug("task execute end, get %p", pDataBlock); if (pDataBlock != NULL) { tqAddBlockDataToRsp(pDataBlock, pRsp); @@ -97,7 +100,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset } if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - tqOffsetResetToLog(pOffset, pExec->tsdbEndVer + 1); + tqOffsetResetToLog(pOffset, pHandle->snapshotVer + 1); qStreamPrepareScan(task, pOffset); continue; } @@ -116,7 +119,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version); } - + tqDebug("task exec exited"); break; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 67fa4ed166..e6df58696d 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -19,6 +19,7 @@ static int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1; if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1; + if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1; if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1; if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { @@ -32,6 +33,7 @@ static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1; if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1; + if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1; if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1; if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { @@ -78,19 +80,25 @@ int32_t tqMetaOpen(STQ* pTq) { tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecodeSTqHandle(&decoder, &handle); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - for (int32_t i = 0; i < 5; i++) { - handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode); - } + /*for (int32_t i = 0; i < 5; i++) {*/ + /*handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/ + /*}*/ if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { for (int32_t i = 0; i < 5; i++) { SReadHandle reader = { - .tqReader = handle.execHandle.pExecReader[i], .meta = pTq->pVnode->pMeta, - .pMsgCb = &pTq->pVnode->msgCb, .vnode = pTq->pVnode, + .initTableReader = true, + .initTqReader = true, + .version = handle.snapshotVer, }; - handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader); + handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader); ASSERT(handle.execHandle.execCol.task[i]); + void* scanner = NULL; + qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner); + ASSERT(scanner); + handle.execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner); + ASSERT(handle.execHandle.pExecReader[i]); } } else { handle.execHandle.execDb.pFilterOutTbUid = diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9f6b12c13a..6e4f02527f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -106,6 +106,30 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO return code; } +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers) { + if (msg == NULL) { + // TODO create raw scan + return NULL; + } + + struct SSubplan* plan = NULL; + int32_t code = qStringToSubplan(msg, &plan); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + qTaskInfo_t pTaskInfo = NULL; + code = qCreateExecTask(readers, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE); + if (code != TSDB_CODE_SUCCESS) { + // TODO: destroy SSubplan & pTaskInfo + terrno = code; + return NULL; + } + + return pTaskInfo; +} + qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { if (msg == NULL) { return NULL; @@ -186,7 +210,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, - int32_t* tversion) { + int32_t* tversion) { ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index b30800680b..5d2f9532b4 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -269,13 +269,13 @@ const STqOffset* qExtractStatusFromStreamScanner(void* scanner) { void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); return pTaskInfo->streamInfo.metaBlk; } int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal)); return 0; } @@ -283,70 +283,70 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); pTaskInfo->streamInfo.prepareStatus = *pOffset; // TODO: optimize - /*if (pTaskInfo->streamInfo.lastStatus.type != pOffset->type ||*/ - /*pTaskInfo->streamInfo.prepareStatus.version != pTaskInfo->streamInfo.lastStatus.version) {*/ - while (1) { - uint8_t type = pOperator->operatorType; - pOperator->status = OP_OPENED; - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamScanInfo* pInfo = pOperator->info; - if (pOffset->type == TMQ_OFFSET__LOG) { - if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) { - return -1; - } - ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version); - } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ - int64_t uid = pOffset->uid; - int64_t ts = pOffset->ts; - - if (uid == 0) { - if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); - uid = pTableInfo->uid; - ts = INT64_MIN; + if (pTaskInfo->streamInfo.lastStatus.type != pOffset->type || + pTaskInfo->streamInfo.prepareStatus.version != pTaskInfo->streamInfo.lastStatus.version) { + while (1) { + uint8_t type = pOperator->operatorType; + pOperator->status = OP_OPENED; + if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pInfo = pOperator->info; + if (pOffset->type == TMQ_OFFSET__LOG) { + if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) { + return -1; } - } - if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA || - pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) { - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); - bool found = false; - for (int32_t i = 0; i < tableSz; i++) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); - if (pTableInfo->uid == uid) { - found = true; - pTableScanInfo->currentTable = i; + ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version); + } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ + int64_t uid = pOffset->uid; + int64_t ts = pOffset->ts; + + if (uid == 0) { + if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); + uid = pTableInfo->uid; + ts = INT64_MIN; } } + if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA || + pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) { + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + bool found = false; + for (int32_t i = 0; i < tableSz; i++) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); + if (pTableInfo->uid == uid) { + found = true; + pTableScanInfo->currentTable = i; + } + } - // TODO after dropping table, table may be not found - ASSERT(found); + // TODO after dropping table, table may be not found + ASSERT(found); - tsdbSetTableId(pTableScanInfo->dataReader, uid); - int64_t oldSkey = pTableScanInfo->cond.twindows.skey; - pTableScanInfo->cond.twindows.skey = ts + 1; - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); - pTableScanInfo->cond.twindows.skey = oldSkey; - pTableScanInfo->scanTimes = 0; + tsdbSetTableId(pTableScanInfo->dataReader, uid); + int64_t oldSkey = pTableScanInfo->cond.twindows.skey; + pTableScanInfo->cond.twindows.skey = ts + 1; + tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); + pTableScanInfo->cond.twindows.skey = oldSkey; + pTableScanInfo->scanTimes = 0; - qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts, - pTableScanInfo->currentTable, tableSz); + qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts, + pTableScanInfo->currentTable, tableSz); + } + + } else { + ASSERT(0); } - + return 0; } else { - ASSERT(0); + ASSERT(pOperator->numOfDownstream == 1); + pOperator = pOperator->pDownstream[0]; } - return 0; - } else { - ASSERT(pOperator->numOfDownstream == 1); - pOperator = pOperator->pDownstream[0]; } } - /*}*/ return 0; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 29818e56bb..e5be74d948 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -576,14 +576,15 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc int32_t numOfRows = 0; for (int32_t k = 0; k < numOfOutput; ++k) { - int32_t outputSlotId = pExpr[k].base.resSchema.slotId; - SqlFunctionCtx* pfCtx = &pCtx[k]; + int32_t outputSlotId = pExpr[k].base.resSchema.slotId; + SqlFunctionCtx* pfCtx = &pCtx[k]; SInputColumnInfoData* pInputData = &pfCtx->input; if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); if (pResult->info.rows > 0 && !createNewColModel) { - colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows); + colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], + pInputData->numOfRows); } else { colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info); } @@ -641,11 +642,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } else if (fmIsAggFunc(pfCtx->functionId)) { // _group_key function for "partition by tbname" + csum(col_name) query SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); - int32_t slotId = pfCtx->param[0].pCol->slotId; + int32_t slotId = pfCtx->param[0].pCol->slotId; // todo handle the json tag SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId); - for(int32_t f = 0; f < pSrcBlock->info.rows; ++f) { + for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) { bool isNull = colDataIsNull_s(pInput, f); if (isNull) { colDataAppendNULL(pOutput, pResult->info.rows + f); @@ -3250,6 +3251,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->status == OP_EXEC_DONE) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { + pOperator->status = OP_OPENED; + return NULL; + } return NULL; } @@ -3283,11 +3288,15 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { while (1) { // The downstream exec may change the value of the newgroup, so use a local variable instead. + qDebug("projection call next"); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - // TODO optimize - /*if (pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {*/ + qDebug("projection get null"); + + /*if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH) {*/ doSetOperatorCompleted(pOperator); + /*} else if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {*/ + /*pOperator->status = OP_RES_TO_RETURN;*/ /*}*/ break; } @@ -3819,7 +3828,8 @@ _error: return NULL; } -static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo) { +static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, + SExecTaskInfo* pTaskInfo) { int32_t order = 0; int32_t scanFlag = 0; @@ -3874,9 +3884,9 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; - while(1) { + while (1) { // here we need to handle the existsed group results - if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method + if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method for (int32_t k = 0; k < pSup->numOfExprs; ++k) { SqlFunctionCtx* pCtx = &pSup->pCtx[k]; @@ -3974,15 +3984,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); - pInfo->binfo.pRes = pResBlock; - pInfo->pCondition = pPhyNode->node.pConditions; - pInfo->pPseudoColInfo= setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); + pInfo->binfo.pRes = pResBlock; + pInfo->pCondition = pPhyNode->node.pConditions; + pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); - pOperator->name = "IndefinitOperator"; + pOperator->name = "IndefinitOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, @@ -4047,8 +4057,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId; int32_t numOfOutputCols = 0; - SArray* pColMatchColInfo = - extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, + &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type); @@ -4056,18 +4066,18 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* goto _error; } - pInfo->pRes = pResBlock; - pInfo->multigroupResult = multigroupResult; - pInfo->pCondition = pPhyFillNode->node.pConditions; - pInfo->pColMatchColInfo = pColMatchColInfo; - pOperator->name = "FillOperator"; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; - pOperator->exprSupp.pExprInfo = pExprInfo; + pInfo->pRes = pResBlock; + pInfo->multigroupResult = multigroupResult; + pInfo->pCondition = pPhyFillNode->node.pConditions; + pInfo->pColMatchColInfo = pColMatchColInfo; + pOperator->name = "FillOperator"; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; + pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.numOfExprs = num; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0194cd78dc..8568ad02b1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1208,6 +1208,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { /*return NULL;*/ /*}*/ + qDebug("stream scan called"); if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { while (1) { SFetchRet ret = {0}; @@ -1229,6 +1230,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } else if (ret.fetchType == FETCH_TYPE__NONE) { pTaskInfo->streamInfo.lastStatus = ret.offset; ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version); + qDebug("stream scan return null"); return NULL; } else { ASSERT(0); @@ -1257,6 +1259,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // TODO move into scan blockDataUpdateTsWindow(pBlock, 0); switch (pBlock->info.type) { + case STREAM_NORMAL: + case STREAM_GET_ALL: + return pBlock; case STREAM_RETRIEVE: { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; @@ -1286,6 +1291,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } return pBlock; } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { + qInfo("scan mode %d", pInfo->scanMode); if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { blockDataDestroy(pInfo->pUpdateRes); pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; @@ -1380,7 +1386,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } } } - + qInfo("scan rows: %d", pBlockInfo->rows); return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; #if 0 diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 947d10dcb4..0d18c47cab 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1332,13 +1332,13 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, if (chIds && pPullDataMap) { SArray* chAy = *(SArray**)chIds; int32_t size = taosArrayGetSize(chAy); - qInfo("window %" PRId64 " wait child size:%d", win.skey, size); + qDebug("window %" PRId64 " wait child size:%d", win.skey, size); for (int32_t i = 0; i < size; i++) { - qInfo("window %" PRId64 " wait chid id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i)); + qDebug("window %" PRId64 " wait chid id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i)); } continue; } else if (pPullDataMap) { - qInfo("close window %" PRId64, win.skey); + qDebug("close window %" PRId64, win.skey); } SResultRowPosition* pPos = (SResultRowPosition*)pIte; if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { @@ -2491,8 +2491,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc if (IS_FINAL_OP(pInfo)) { forwardRows = 1; } else { - forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, - TSDB_ORDER_ASC); + forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, + NULL, TSDB_ORDER_ASC); } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { saveResultRow(pResult, tableGroupId, pUpdated); @@ -2609,6 +2609,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; + qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + if (pOperator->status == OP_EXEC_DONE) { return NULL; } else if (pOperator->status == OP_RES_TO_RETURN) { @@ -2659,7 +2661,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { clearSpecialDataBlock(pInfo->pUpdateRes); removeDeleteResults(pUpdated, pInfo->pDelWins); pOperator->status = OP_RES_TO_RETURN; - qInfo("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); break; } printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv"); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 29e0f7ded0..ed85ce31c3 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -173,7 +173,8 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->upstreamTaskId); + qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, + pReq->upstreamTaskId); // 1. handle input streamTaskEnqueue(pTask, pReq, pRsp); @@ -208,7 +209,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED); - qDebug("task %d receive dispatch rsp", pTask->taskId); + qInfo("task %d receive dispatch rsp", pTask->taskId); int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); ASSERT(old == TASK_OUTPUT_STATUS__WAIT); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 98b0874b00..8034840fce 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -303,7 +303,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { } ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); - qDebug("stream continue dispatching: task %d", pTask->taskId); + qInfo("stream continue dispatching: task %d", pTask->taskId); SRpcMsg dispatchMsg = {0}; SEpSet* pEpSet = NULL; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d178c19615..1286b4c69e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -26,10 +26,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->isDataScan); SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; + qInfo("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef); qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SStreamDataBlock* pBlock = (SStreamDataBlock*)data; SArray* blocks = pBlock->blocks; + qInfo("task %d %p set ssdata input", pTask->taskId, pTask); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false); } else if (pItem->type == STREAM_INPUT__DROP) { // TODO exec drop diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index eb0c7f56bd..908523f2a6 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -66,6 +66,7 @@ void walCloseReader(SWalReader *pRead) { } int32_t walNextValidMsg(SWalReader *pRead) { + wDebug("vgId:%d wal start to fetch", pRead->pWal->cfg.vgId); int64_t fetchVer = pRead->curVersion; int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal); while (fetchVer <= endVer) { @@ -176,7 +177,7 @@ int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) { return -1; } - wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver); + wDebug("wal version reset from %ld(invalid: %d) to %ld", pRead->curVersion, pRead->curInvalid, ver); pRead->curVersion = ver; return 0; @@ -242,6 +243,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { return -1; } } + pRead->curInvalid = 0; return 0; } @@ -301,6 +303,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { int64_t code; ASSERT(pRead->curVersion == pRead->pHead->head.version); + ASSERT(pRead->curInvalid == 0); code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR); if (code < 0) { @@ -404,6 +407,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { } int32_t walReadVer(SWalReader *pRead, int64_t ver) { + wDebug("vgId:%d wal start to read ver %ld", pRead->pWal->cfg.vgId, ver); int64_t contLen; bool seeked = false;