diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 69677e6bc1..1158a46161 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2408,6 +2408,11 @@ typedef struct SColLocation { int8_t type; } SColLocation; +typedef struct SVgroupVer { + int32_t vgId; + int64_t ver; +} SVgroupVer; + typedef struct { char name[TSDB_STREAM_FNAME_LEN]; char sourceDB[TSDB_DB_FNAME_LEN]; @@ -2431,6 +2436,7 @@ typedef struct { int64_t deleteMark; int8_t igUpdate; int64_t lastTs; + SArray* pVgroupVerList; } SCMCreateStreamReq; typedef struct { diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index dc145819ca..fc9b88340f 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -18,7 +18,7 @@ // message process int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart); -int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId); +int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId); int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored); int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index a0b5d938e3..3c5f23af6b 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -126,6 +126,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_TAGS, FUNCTION_TYPE_TBUID, FUNCTION_TYPE_VGID, + FUNCTION_TYPE_VGVER, // internal function FUNCTION_TYPE_SELECT_VALUE = 3750, diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index bb206b5a02..6a41f4607b 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -106,7 +106,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata const struct SMetaData* pMetaData, SQuery* pQuery); int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq, const struct SMetaData* pMetaData, SQuery* pQuery); -int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pResRow); +int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, SSDataBlock* pBlock); void qDestroyParseContext(SParseContext* pCxt); diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index 789ba554e2..5e946357db 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -96,9 +96,7 @@ int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); -int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); -int32_t qTbUidFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); -int32_t qVgIdFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t qPseudoTagFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); /* Aggregation functions */ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 747ba34c97..9738be839d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -531,7 +531,7 @@ typedef struct SStreamMeta { int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); -SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam, +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, SArray* pTaskList, bool hasFillhistory); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 9800d233e9..71f6a466b9 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -880,19 +880,21 @@ static bool incompletaFileParsing(SNode* pStmt) { return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing; } -void continuePostSubQuery(SRequestObj* pRequest, TAOS_ROW row) { +void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) { SSqlCallbackWrapper* pWrapper = pRequest->pWrapper; - int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId); + + int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId); if (TSDB_CODE_SUCCESS == code) { int64_t analyseStart = taosGetTimestampUs(); - code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, (void**)row); + code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock); pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart; } + if (TSDB_CODE_SUCCESS == code) { code = qContinuePlanPostQuery(pRequest->pPostPlan); } - nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId); + nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId); handleQueryAnslyseRes(pWrapper, NULL, code); } @@ -916,6 +918,43 @@ void returnToUser(SRequestObj* pRequest) { } } +static SSDataBlock* createResultBlock(TAOS_RES* pRes, int32_t numOfRows) { + int64_t lastTs = 0; + + TAOS_FIELD* pResFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + SSDataBlock* pBlock = createDataBlock(); + + for(int32_t i = 0; i < numOfFields; ++i) { + SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1); + blockDataAppendColInfo(pBlock, &colInfoData); + } + + blockDataEnsureCapacity(pBlock, numOfRows); + + for (int32_t i = 0; i < numOfRows; ++i) { + TAOS_ROW pRow = taos_fetch_row(pRes); + int64_t ts = *(int64_t*)pRow[0]; + if (lastTs < ts) { + lastTs = ts; + } + + for(int32_t j = 0; j < numOfFields; ++j) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j); + colDataSetVal(pColInfoData, i, pRow[j], false); + } + + tscDebug("lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1], *(int64_t*)pRow[2]); + } + + pBlock->info.window.ekey = lastTs; + pBlock->info.rows = numOfRows; + + tscDebug("lastKey:%"PRId64" numOfRows:%d from all vgroups", lastTs, numOfRows); + return pBlock; +} + void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) { SRequestObj* pRequest = (SRequestObj*)res; if (pRequest->code) { @@ -923,14 +962,10 @@ void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) { return; } - TAOS_ROW row = NULL; - if (rowNum > 0) { - row = taos_fetch_row(res); // for single row only now - } - + SSDataBlock* pBlock = createResultBlock(res, rowNum); SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId); if (pNextReq) { - continuePostSubQuery(pNextReq, row); + continuePostSubQuery(pNextReq, pBlock); releaseRequest(pRequest->relation.nextRefId); } else { tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self, diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index e6519a436e..c1adc49d48 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -829,7 +829,10 @@ TEST(clientCase, projection_query_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int, f varchar(4096)) tags(a int)"); +// TAOS_RES* pRes = taos_query(pConn, "select tbname, last(ts) from abc1.stable_1 group by tbname"); +// taos_free_result(pRes); + + pRes = taos_query(pConn, "create stream stream_1 trigger at_once fill_history 1 ignore expired 0 into str_res1 as select _wstart as ts, count(*) from stable_1 interval(10s);"); if (taos_errno(pRes) != 0) { printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3f1cfbc87f..2ab4c9ed8d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7217,6 +7217,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1; if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1; + if (tEncodeI32(&encoder, taosArrayGetSize(pReq->fillNullCols)) < 0) return -1; for (int32_t i = 0; i < taosArrayGetSize(pReq->fillNullCols); ++i) { SColLocation *pCol = taosArrayGet(pReq->fillNullCols, i); @@ -7224,10 +7225,19 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeI16(&encoder, pCol->colId) < 0) return -1; if (tEncodeI8(&encoder, pCol->type) < 0) return -1; } + if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1; if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1; if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1; + if (tEncodeI32(&encoder, taosArrayGetSize(pReq->pVgroupVerList)) < 0) return -1; + + for(int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) { + SVgroupVer* p = taosArrayGet(pReq->pVgroupVerList, i); + if (tEncodeI32(&encoder, p->vgId) < 0) return -1; + if (tEncodeI64(&encoder, p->ver) < 0) return -1; + } + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -7238,6 +7248,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) { int32_t sqlLen = 0; int32_t astLen = 0; + int32_t numOfFillNullCols = 0; + int32_t numOfVgVer = 0; SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -7289,7 +7301,6 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea } if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1; if (tDecodeU64(&decoder, &pReq->targetStbUid) < 0) return -1; - int32_t numOfFillNullCols = 0; if (tDecodeI32(&decoder, &numOfFillNullCols) < 0) return -1; if (numOfFillNullCols > 0) { pReq->fillNullCols = taosArrayInit(numOfFillNullCols, sizeof(SColLocation)); @@ -7314,9 +7325,28 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1; if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1; - tEndDecode(&decoder); + if (tDecodeI32(&decoder, &numOfVgVer) < 0) return -1; + if (numOfVgVer > 0) { + pReq->pVgroupVerList = taosArrayInit(numOfVgVer, sizeof(SVgroupVer)); + if (pReq->pVgroupVerList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + for (int32_t i = 0; i < numOfVgVer; ++i) { + SVgroupVer v = {0}; + if (tDecodeI32(&decoder, &v.vgId) < 0) return -1; + if (tDecodeI64(&decoder, &v.ver) < 0) return -1; + if (taosArrayPush(pReq->pVgroupVerList, &v) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + } + + tEndDecode(&decoder); tDecoderClear(&decoder); + return 0; } diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index cba52c6b45..3c51f34fee 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -27,7 +27,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark); -int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey); +int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVerList); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 39121cecd9..1d8b2cf5d3 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -234,36 +234,72 @@ int32_t doAddShuffleSinkTask(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStr int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) { - int64_t uid = (isFillhistory)? pStream->hTaskUid:pStream->uid; - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, isFillhistory, 0, pTaskList, pStream->conf.fillHistory); + int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; + SStreamTask* pTask = + tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, pTaskList, pStream->conf.fillHistory); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - epsetAssign(&(pTask)->info.mnodeEpset, pEpset); - pTask->info.nodeId = vgId; pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup); mndSetSinkTaskInfo(pStream, pTask); return 0; } +static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) { + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + SVgroupVer* pVer = taosArrayGet(pList, i); + if (pVer->vgId == vgId) { + return pVer->ver; + } + } + + mError("failed to find the vgId:%d for extract last version", vgId); + return -1; +} + +static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) { + int64_t latestVer = getVgroupLastVer(pVerList, vgId); + if (latestVer < 0) { + latestVer = 0; + } + + // set the correct ts, which is the last key of queried table. + SDataRange* pRange = &pTask->dataRange; + STimeWindow* pWindow = &pRange->window; + + if (pTask->info.fillHistory) { + pWindow->skey = INT64_MIN; + pWindow->ekey = skey - 1; + + pRange->range.minVer = 0; + pRange->range.maxVer = latestVer; + mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, + pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer); + } else { + pWindow->skey = skey; + pWindow->ekey = INT64_MAX; + + pRange->range.minVer = latestVer + 1; + pRange->range.maxVer = INT64_MAX; + + mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, + pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer); + } +} + static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, - SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, bool fillHistory, - bool hasExtraSink, int64_t nextWindowSkey, bool hasFillHistory) { - SStreamTask* pTask = - tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillHistory); + SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, int64_t skey, + SArray* pVerList, bool fillHistory, bool hasExtraSink, bool hasFillHistory) { + int64_t t = pStream->conf.triggerParam; + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, fillHistory, t, pTaskList, hasFillHistory); if (pTask == NULL) { return terrno; } - epsetAssign(&pTask->info.mnodeEpset, pEpset); - STimeWindow* pWindow = &pTask->dataRange.window; - - pWindow->skey = INT64_MIN; - pWindow->ekey = nextWindowSkey - 1; - mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pWindow->skey, pWindow->ekey); + streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId); // sink or dispatch if (hasExtraSink) { @@ -308,7 +344,7 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { } static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream, - SEpSet* pEpset, bool hasExtraSink, int64_t nextWindowSkey) { + SEpSet* pEpset, bool hasExtraSink, int64_t skey, SArray* pVerList) { // create exec stream task, since only one level, the exec task is also the source task SArray* pTaskList = addNewTaskList(pStream->tasks); SSdb* pSdb = pMnode->pSdb; @@ -345,8 +381,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* // new stream task SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); - int32_t code = addSourceTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset, - false, hasExtraSink, nextWindowSkey, pStream->conf.fillHistory); + int32_t code = addSourceTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset, skey, + pVerList, false, hasExtraSink, pStream->conf.fillHistory); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); return -1; @@ -354,8 +390,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* if (pStream->conf.fillHistory) { SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL); - code = addSourceTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, - pEpset, true, hasExtraSink, nextWindowSkey, true); + code = addSourceTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, pEpset, skey, + pVerList, true, hasExtraSink, true); } sdbRelease(pSdb, pVgroup); @@ -371,24 +407,17 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* return TSDB_CODE_SUCCESS; } -static int32_t doAddSourceTask(SArray* pTaskList, bool isFillhistory, int64_t uid, SStreamTask* pDownstreamTask, - SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, SEpSet* pEpset, - int64_t nextWindowSkey, bool hasFillHistory) { - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, isFillhistory, 0, pTaskList, hasFillHistory); +static int32_t addSourceTaskForMultiLevelStream(SArray* pTaskList, bool isFillhistory, int64_t uid, + SStreamTask* pDownstreamTask, SMnode* pMnode, SSubplan* pPlan, + SVgObj* pVgroup, SEpSet* pEpset, int64_t skey, SArray* pVerList, + bool hasFillHistory) { + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, 0, pTaskList, hasFillHistory); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - epsetAssign(&(pTask)->info.mnodeEpset, pEpset); - - // set the correct ts, which is the last key of queried table. - STimeWindow* pWindow = &pTask->dataRange.window; - pWindow->skey = INT64_MIN; - pWindow->ekey = nextWindowSkey - 1; - - mDebug("s-task:0x%x level:%d set time window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->info.taskLevel, - pWindow->skey, pWindow->ekey); + streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId); // all the source tasks dispatch result to a single agg node. streamTaskSetFixedDownstreamInfo(pTask, pDownstreamTask); @@ -401,14 +430,12 @@ static int32_t doAddSourceTask(SArray* pTaskList, bool isFillhistory, int64_t ui static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, bool fillHistory, SStreamTask** pAggTask, bool hasFillhistory) { - *pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillhistory); + *pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillhistory); if (*pAggTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - epsetAssign(&(*pAggTask)->info.mnodeEpset, pEpset); - // dispatch if (mndAddDispatcherForInternalTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) { return -1; @@ -492,7 +519,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPlan, SStreamObj* pStream, SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask, - SEpSet* pEpset, int64_t nextWindowSkey) { + SEpSet* pEpset, int64_t skey, SArray* pVerList) { SArray* pSourceTaskList = addNewTaskList(pStream->tasks); SArray* pHSourceTaskList = NULL; @@ -521,8 +548,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl continue; } - int32_t code = doAddSourceTask(pSourceTaskList, false, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset, - nextWindowSkey, pStream->conf.fillHistory); + int32_t code = addSourceTaskForMultiLevelStream(pSourceTaskList, false, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset, + skey, pVerList, pStream->conf.fillHistory); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); terrno = code; @@ -530,8 +557,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl } if (pStream->conf.fillHistory) { - code = doAddSourceTask(pHSourceTaskList, true, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup, pEpset, - nextWindowSkey, pStream->conf.fillHistory); + code = addSourceTaskForMultiLevelStream(pHSourceTaskList, true, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup, pEpset, + skey, pVerList, pStream->conf.fillHistory); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); return code; @@ -580,7 +607,8 @@ static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpst } } -static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) { +static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey, + SArray* pVerList) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); @@ -637,15 +665,15 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* } // source level - return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, pEpset, nextWindowSkey); + return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, pEpset, skey, pVerList); } else if (numOfPlanLevel == 1) { - return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, pEpset, hasExtraSink, nextWindowSkey); + return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, pEpset, hasExtraSink, skey, pVerList); } return 0; } -int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey) { +int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) { SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -655,7 +683,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindo SEpSet mnodeEpset = {0}; mndGetMnodeEpSet(pMnode, &mnodeEpset); - int32_t code = doScheduleStream(pStream, pMnode, pPlan, nextWindowSkey, &mnodeEpset); + int32_t code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList); qDestroyQueryPlan(pPlan); return code; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index e6027a0332..cfd7ecf054 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -638,7 +638,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; - if (mndScheduleStream(pMnode, &streamObj, 1685959190000) != 0) goto _OVER; + if (mndScheduleStream(pMnode, &streamObj, 1685959190000, NULL) != 0) goto _OVER; if (mndPersistStream(pTrans, &streamObj) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 0de951596f..6640e5dca3 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -649,8 +649,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { return -1; } - SCMCreateStreamReq createStreamReq = {0}; - if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) { + SCMCreateStreamReq createReq = {0}; + if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto _OVER; } @@ -659,17 +659,17 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { terrno = TSDB_CODE_MND_INVALID_PLATFORM; goto _OVER; #endif - mInfo("stream:%s, start to create, sql:%s", createStreamReq.name, createStreamReq.sql); + mInfo("stream:%s, start to create, sql:%s", createReq.name, createReq.sql); - if (mndCheckCreateStreamReq(&createStreamReq) != 0) { - mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); + if (mndCheckCreateStreamReq(&createReq) != 0) { + mError("stream:%s, failed to create since %s", createReq.name, terrstr()); goto _OVER; } - pStream = mndAcquireStream(pMnode, createStreamReq.name); + pStream = mndAcquireStream(pMnode, createReq.name); if (pStream != NULL) { - if (createStreamReq.igExists) { - mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name); + if (createReq.igExists) { + mInfo("stream:%s, already exist, ignore exist is set", createReq.name); goto _OVER; } else { terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; @@ -679,16 +679,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - if (createStreamReq.sql != NULL) { - sqlLen = strlen(createStreamReq.sql); + if (createReq.sql != NULL) { + sqlLen = strlen(createReq.sql); sql = taosMemoryMalloc(sqlLen + 1); memset(sql, 0, sqlLen + 1); - memcpy(sql, createStreamReq.sql, sqlLen); + memcpy(sql, createReq.sql, sqlLen); } // build stream obj from request - if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) { - mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); + if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq) < 0) { + mError("stream:%s, failed to create since %s", createReq.name, terrstr()); goto _OVER; } @@ -702,23 +702,23 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } // create stb for stream - if (createStreamReq.createStb == STREAM_CREATE_STABLE_TRUE && + if (createReq.createStb == STREAM_CREATE_STABLE_TRUE && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { - mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr()); + mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, terrstr()); mndTransDrop(pTrans); goto _OVER; } // schedule stream task for stream obj - if (mndScheduleStream(pMnode, &streamObj, createStreamReq.lastTs) < 0) { - mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr()); + if (mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList) < 0) { + mError("stream:%s, failed to schedule since %s", createReq.name, terrstr()); mndTransDrop(pTrans); goto _OVER; } // add stream to trans if (mndPersistStream(pTrans, &streamObj) < 0) { - mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr()); + mError("stream:%s, failed to schedule since %s", createReq.name, terrstr()); mndTransDrop(pTrans); goto _OVER; } @@ -749,10 +749,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); SName dbname = {0}; - tNameFromString(&dbname, createStreamReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); SName name = {0}; - tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); // reuse this function for stream if (sql != NULL && sqlLen > 0) { @@ -765,11 +765,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { _OVER: if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); + mError("stream:%s, failed to create since %s", createReq.name, terrstr()); } mndReleaseStream(pMnode, pStream); - tFreeSCMCreateStreamReq(&createStreamReq); + tFreeSCMCreateStreamReq(&createReq); tFreeStreamObj(&streamObj); if (sql != NULL) { taosMemoryFreeClear(sql); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1ade1c8c41..8689c30a55 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -738,10 +738,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { if (pTask->pState == NULL) { tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId); return -1; - } else { - tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } + tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); if (pTask->info.fillHistory) { restoreStreamTaskId(pTask, &taskId); } @@ -846,17 +845,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { if (pTask->info.fillHistory) { tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 - " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", + " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 + " ms, inputVer:%" PRId64, vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, - (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam); + (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam, nextProcessVer); } else { tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 - " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", + " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 + " ms, inputVer:%" PRId64, vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, - (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam); + (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer); } return 0; @@ -876,12 +877,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { - const char* id = pTask->id.idStr; - int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; - - // if it's an source task, extract the last version in wal. + const char* id = pTask->id.idStr; + int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; SVersionRange* pRange = &pTask->dataRange.range; + // if it's an source task, extract the last version in wal. bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); pTask->execInfo.step2Start = taosGetTimestampMs(); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 21bc09eba0..c4973b7c1e 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -49,7 +49,7 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { return 0; } -int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) { +int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) { int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); @@ -547,7 +547,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve streamMetaWUnLock(pMeta); if (code < 0) { - tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, + tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); tFreeStreamTask(pTask); return code; @@ -562,9 +562,10 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve if (restored) { SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId); - if (p != NULL && (p->info.fillHistory == 0)) { - tqStreamOneTaskStartAsync(pMeta, cb, streamId, taskId); + if ((p != NULL) && (p->info.fillHistory == 0)) { + tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId); } + if (p != NULL) { streamMetaReleaseTask(pMeta, p); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 609f38cead..9d158668d2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2294,6 +2294,7 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; pResBlock->info.dataLoad = 1; + pResBlock->info.version = pReader->info.verRange.maxVer; blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); setComposedBlockFlag(pReader, true); @@ -2799,6 +2800,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { pInfo->rows = pBlockInfo->numRow; pInfo->id.uid = pScanInfo->uid; pInfo->dataLoad = 0; + pInfo->version = pReader->info.verRange.maxVer; pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey}; setComposedBlockFlag(pReader, false); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 64c14456b6..fa178b6488 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -279,7 +279,6 @@ typedef struct STableScanInfo { int8_t assignBlockUid; uint8_t countState; // empty table count state bool hasGroupByTag; - bool countOnly; bool filesetDelimited; bool needCountEmptyTable; } STableScanInfo; @@ -757,10 +756,12 @@ extern void doDestroyExchangeOperatorInfo(void* param); int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, - int32_t rows, const char* idStr, STableMetaCacheInfo* pCache); + int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); +void setVgIdColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int32_t vgId); +void setVgVerColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int64_t vgVer); void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset); void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 63fcfba7c1..e4fa9f7580 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -250,7 +250,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SExprSupp* pSup = &pInfo->pseudoExprSup; int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, - pRes->info.rows, GET_TASKID(pTaskInfo), NULL); + pRes->info.rows, pTaskInfo, NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -313,7 +313,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (taosArrayGetSize(pInfo->pUidList) > 0) { pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, - pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); + pInfo->pRes->info.rows, pTaskInfo, NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 1e9771edd6..87b8cd366c 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -287,7 +287,6 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { terrno = TSDB_CODE_SUCCESS; int32_t num = 0; - uint64_t groupId = 0; for (int32_t j = 0; j < pBlock->info.rows; ++j) { // Compare with the previous row of this column, and do not set the output buffer again if they are identical. if (!pInfo->isInit) { @@ -478,20 +477,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; -#if 0 - if(pOperator->fpSet.encodeResultRow){ - char *result = NULL; - int32_t length = 0; - pOperator->fpSet.encodeResultRow(pOperator, &result, &length); - SAggSupporter* pSup = &pInfo->aggSup; - taosHashClear(pSup->pResultRowHashTable); - pInfo->binfo.resultRowInfo.size = 0; - pOperator->fpSet.decodeResultRow(pOperator, result); - if(result){ - taosMemoryFree(result); - } - } -#endif // initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0); if (pGroupResInfo->pRows != NULL) { taosArrayDestroy(pGroupResInfo->pRows); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ec1d6b9f75..8e54fc684b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -21,7 +21,6 @@ #include "querynodes.h" #include "systable.h" #include "tname.h" -#include "ttime.h" #include "tdatablock.h" #include "tmsg.h" @@ -252,7 +251,7 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo SExprSupp* pSup = &pTableScanInfo->pseudoSup; int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows, - GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache); + pTaskInfo, &pTableScanInfo->metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { T_LONG_JMP(pTaskInfo->env, code); @@ -295,8 +294,8 @@ bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; @@ -482,24 +481,25 @@ static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t } int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, - int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) { + int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache) { + int32_t code = 0; + bool freeReader = false; + LRUHandle* h = NULL; + STableCachedVal val = {0}; + SMetaReader mr = {0}; + const char* idStr = pTask->id.str; + // currently only the tbname pseudo column if (numOfExpr <= 0) { return TSDB_CODE_SUCCESS; } - int32_t code = 0; - bool freeReader = false; + // todo: opt if only require the vgId and the vgVer; // backup the rows int32_t backupRows = pBlock->info.rows; pBlock->info.rows = rows; - STableCachedVal val = {0}; - - SMetaReader mr = {0}; - LRUHandle* h = NULL; - // todo refactor: extract method // the handling of the null data should be packed in the extracted method @@ -586,7 +586,14 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int // this is to handle the tbname if (fmIsScanPseudoColumnFunc(functionId)) { - setTbNameColData(pBlock, pColInfoData, functionId, val.pName); + int32_t fType = pExpr1->pExpr->_function.functionType; + if (fType == FUNCTION_TYPE_TBNAME) { + setTbNameColData(pBlock, pColInfoData, functionId, val.pName); + } else if (fType == FUNCTION_TYPE_VGID) { + setVgIdColData(pBlock, pColInfoData, functionId, pTask->id.vgId); + } else if (fType == FUNCTION_TYPE_VGVER) { + setVgVerColData(pBlock, pColInfoData, functionId, pBlock->info.version); + } } else { // these are tags STagVal tagVal = {0}; tagVal.cid = pExpr1->base.pParam[0].pCol->colId; @@ -655,6 +662,47 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, colDataDestroy(&infoData); } +void setVgIdColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int32_t vgId) { + struct SScalarFuncExecFuncs fpSet = {0}; + fmGetScalarFuncExecFuncs(functionId, &fpSet); + + SColumnInfoData infoData = createColumnInfoData(pColInfoData->info.type, pColInfoData->info.bytes, 1); + + colInfoDataEnsureCapacity(&infoData, 1, false); + colDataSetVal(&infoData, 0, (const char*)&vgId, false); + + SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData}; + SScalarParam param = {.columnData = pColInfoData}; + + if (fpSet.process != NULL) { + fpSet.process(&srcParam, 1, ¶m); + } else { + qError("failed to get the corresponding callback function, functionId:%d", functionId); + } + + colDataDestroy(&infoData); +} + +void setVgVerColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int64_t vgVer) { + struct SScalarFuncExecFuncs fpSet = {0}; + fmGetScalarFuncExecFuncs(functionId, &fpSet); + + SColumnInfoData infoData = createColumnInfoData(pColInfoData->info.type, pColInfoData->info.bytes, 1); + + colInfoDataEnsureCapacity(&infoData, 1, false); + colDataSetVal(&infoData, 0, (const char*)&vgVer, false); + + SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData}; + SScalarParam param = {.columnData = pColInfoData}; + + if (fpSet.process != NULL) { + fpSet.process(&srcParam, 1, ¶m); + } else { + qError("failed to get the corresponding callback function, functionId:%d", functionId); + } + + colDataDestroy(&infoData); +} static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, int32_t* size) { tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size); @@ -963,13 +1011,11 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { STableKeyInfo* pList = NULL; if (pInfo->currentGroupId == -1) { - int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); return NULL; } - initNextGroupScan(pInfo, &pList, &num); ASSERT(pInfo->base.dataReader == NULL); @@ -1172,10 +1218,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, goto _error; } - if (scanDebug) { - pInfo->countOnly = true; - } - pInfo->filesetDelimited = pTableScanNode->filesetDelimited; taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); @@ -1901,7 +1943,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - pBlockInfo->rows, id, &pTableScanInfo->base.metaCache); + pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { blockDataFreeRes((SSDataBlock*)pBlock); @@ -2134,7 +2176,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer; pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow; - qDebug("stream recover step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion, + qDebug("stream scan step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id); pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN1; pStreamInfo->recoverScanFinished = false; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4d00b6bb77..ac26f6fe26 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -707,18 +707,20 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_ static int32_t translateTbUidColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // pseudo column do not need to check parameters - pFunc->node.resType = - (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; return TSDB_CODE_SUCCESS; } static int32_t translateVgIdColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // pseudo column do not need to check parameters - pFunc->node.resType = - (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes, .type = TSDB_DATA_TYPE_INT}; + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes, .type = TSDB_DATA_TYPE_INT}; return TSDB_CODE_SUCCESS; } +static int32_t translateVgVerColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; + return TSDB_CODE_SUCCESS; +} static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); @@ -3453,7 +3455,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateTbnameColumn, .getEnvFunc = NULL, .initFunc = NULL, - .sprocessFunc = qTbnameFunction, + .sprocessFunc = qPseudoTagFunction, .finalizeFunc = NULL }, { @@ -3740,11 +3742,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateTbUidColumn, .getEnvFunc = NULL, .initFunc = NULL, -#ifdef BUILD_NO_CALL - .sprocessFunc = qTbUidFunction, -#else - .sprocessFunc = NULL, -#endif + .sprocessFunc = qPseudoTagFunction, .finalizeFunc = NULL }, { @@ -3754,11 +3752,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateVgIdColumn, .getEnvFunc = NULL, .initFunc = NULL, -#ifdef BUILD_NO_CALL - .sprocessFunc = qVgIdFunction, -#else - .sprocessFunc = NULL, -#endif + .sprocessFunc = qPseudoTagFunction, .finalizeFunc = NULL }, { @@ -3781,6 +3775,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = toCharFunction, .finalizeFunc = NULL }, + { + .name = "_vgver", + .type = FUNCTION_TYPE_VGVER, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + .translateFunc = translateVgVerColumn, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = qPseudoTagFunction, + .finalizeFunc = NULL + } }; // clang-format on diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 8496feb4d6..598a8dfd61 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -31,7 +31,7 @@ typedef struct SNodeMemChunk { struct SNodeMemChunk* pNext; } SNodeMemChunk; -typedef struct SNodeAllocator { +struct SNodeAllocator { int64_t self; int64_t queryId; int32_t chunkSize; @@ -39,7 +39,7 @@ typedef struct SNodeAllocator { SNodeMemChunk* pCurrChunk; SNodeMemChunk* pChunks; TdThreadMutex mutex; -} SNodeAllocator; +}; static threadlocal SNodeAllocator* g_pNodeAllocator; static int32_t g_allocatorReqRefPool = -1; diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index a4a7812474..f1c549e154 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -35,8 +35,8 @@ int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMetaCache); int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); -int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow); -int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow); +int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock); +int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock); int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placeholderNo, SArray** pPlaceholderValues); int32_t translateTable(STranslateContext* pCxt, SNode** pTable); int32_t getMetaDataFromHash(const char* pKey, int32_t len, SHashObj* pHash, void** pOutput); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index d246641576..922ad30f39 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -14,6 +14,7 @@ */ #include "parTranslater.h" +#include "tdatablock.h" #include "parInt.h" #include "catalog.h" @@ -6957,7 +6958,7 @@ int32_t createIntervalFromCreateSmaIndexStmt(SCreateIndexStmt* pStmt, SInterval* return TSDB_CODE_SUCCESS; } -int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow) { +int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock) { int32_t code = TSDB_CODE_SUCCESS; SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot; int64_t lastTs = 0; @@ -6967,11 +6968,13 @@ int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, vo if (TSDB_CODE_SUCCESS == code) { code = createIntervalFromCreateSmaIndexStmt(pStmt, &interval); } + if (TSDB_CODE_SUCCESS == code) { - if (pResRow && pResRow[0]) { - lastTs = *(int64_t*)pResRow[0]; + if (pBlock != NULL && pBlock->info.rows >= 1) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); + lastTs = *(int64_t*)colDataGetData(pColInfo, 0); } else if (interval.interval > 0) { - lastTs = convertTimePrecision(taosGetTimestampMs(), TSDB_TIME_PRECISION_MILLI, interval.precision); + lastTs = taosGetTimestamp(interval.precision); } else { lastTs = taosGetTimestampMs(); } @@ -8110,18 +8113,63 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta nodesDestroyList(pParamterList); return TSDB_CODE_OUT_OF_MEMORY; } + code = nodesListStrictAppend(pProjectionList, pFunc); if (code) { nodesDestroyList(pProjectionList); return code; } + SFunctionNode* pFunc1 = createFunction("_vgid", NULL); + if (NULL == pFunc1) { + nodesDestroyList(pParamterList); + return TSDB_CODE_OUT_OF_MEMORY; + } + + snprintf(pFunc1->node.aliasName, sizeof(pFunc1->node.aliasName), "%s.%p", pFunc1->functionName, pFunc1); + code = nodesListStrictAppend(pProjectionList, (SNode*) pFunc1); + if (code) { + nodesDestroyList(pProjectionList); + return code; + } + + SFunctionNode* pFunc2 = createFunction("_vgver", NULL); + if (NULL == pFunc2) { + nodesDestroyList(pParamterList); + return TSDB_CODE_OUT_OF_MEMORY; + } + + snprintf(pFunc2->node.aliasName, sizeof(pFunc2->node.aliasName), "%s.%p", pFunc2->functionName, pFunc2); + code = nodesListStrictAppend(pProjectionList, (SNode*) pFunc2); + if (code) { + nodesDestroyList(pProjectionList); + return code; + } + code = createSimpleSelectStmtFromProjList(pDb, pTable, pProjectionList, (SSelectStmt**)pQuery); if (code) { nodesDestroyList(pProjectionList); return code; } + // todo add the group by statement + SSelectStmt** pSelect1 = (SSelectStmt**)pQuery; + (*pSelect1)->pGroupByList = nodesMakeList(); + + SGroupingSetNode* pNode1 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET); + pNode1->groupingSetType = GP_TYPE_NORMAL; + pNode1->pParameterList = nodesMakeList(); + nodesListAppend(pNode1->pParameterList, (SNode*)pFunc1); + + nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode1); + + SGroupingSetNode* pNode2 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET); + pNode2->groupingSetType = GP_TYPE_NORMAL; + pNode2->pParameterList = nodesMakeList(); + nodesListAppend(pNode2->pParameterList, (SNode*)pFunc2); + + nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2); + return code; } @@ -8269,7 +8317,32 @@ static int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* return code; } -int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow) { +// ts, vgroup_id, vgroup_version +static int32_t createStreamReqVersionInfo(SSDataBlock* pBlock, SArray** pArray, int64_t* lastTs, SInterval* pInterval) { + *pArray = taosArrayInit(pBlock->info.rows, sizeof(SVgroupVer)); + if (*pArray == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (pBlock->info.rows > 0) { + *lastTs = pBlock->info.window.ekey; + SColumnInfoData* pCol1 = taosArrayGet(pBlock->pDataBlock, 1); + SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 2); + + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + SVgroupVer v = {.vgId = *(int32_t*)colDataGetData(pCol1, i), .ver = *(int64_t*)colDataGetData(pCol2, i)}; + parserDebug("-------------%ld, vgId:%d, vgVer:%ld\n", *lastTs, v.vgId, v.ver); + taosArrayPush(*pArray, &v); + } + } else { + int32_t precision = (pInterval->interval > 0)? pInterval->precision:TSDB_TIME_PRECISION_MILLI; + *lastTs = taosGetTimestamp(precision); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock) { SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot; STranslateContext cxt = {0}; SInterval interval = {0}; @@ -8279,15 +8352,11 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void if (TSDB_CODE_SUCCESS == code) { code = buildIntervalForCreateStream(pStmt, &interval); } + if (TSDB_CODE_SUCCESS == code) { - if (pResRow && pResRow[0]) { - lastTs = *(int64_t*)pResRow[0]; - } else if (interval.interval > 0) { - lastTs = convertTimePrecision(taosGetTimestampMs(), TSDB_TIME_PRECISION_MILLI, interval.precision); - } else { - lastTs = taosGetTimestampMs(); - } + code = createStreamReqVersionInfo(pBlock, &pStmt->pReq->pVgroupVerList, &lastTs, &interval); } + if (TSDB_CODE_SUCCESS == code) { if (interval.interval > 0) { pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit, interval.precision); @@ -8296,9 +8365,11 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void } code = buildCmdMsg(&cxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, pStmt->pReq); } + if (TSDB_CODE_SUCCESS == code) { code = setQuery(&cxt, pQuery); } + setRefreshMeta(&cxt, pQuery); destroyTranslateContext(&cxt); diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 1fa4c624a7..92e25f3ee6 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -233,14 +233,17 @@ int32_t qContinueParseSql(SParseContext* pCxt, struct SCatalogReq* pCatalogReq, return parseInsertSql(pCxt, &pQuery, pCatalogReq, pMetaData); } -int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pResRow) { +int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, SSDataBlock* pBlock) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pQuery->pRoot)) { - case QUERY_NODE_CREATE_STREAM_STMT: - code = translatePostCreateStream(pCxt, pQuery, pResRow); + case QUERY_NODE_CREATE_STREAM_STMT: { + code = translatePostCreateStream(pCxt, pQuery, pBlock); break; - case QUERY_NODE_CREATE_INDEX_STMT: - code = translatePostCreateSmaIndex(pCxt, pQuery, pResRow); + } + case QUERY_NODE_CREATE_INDEX_STMT: { + code = translatePostCreateSmaIndex(pCxt, pQuery, pBlock); + break; + } default: break; } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 26552f25b4..62fda148cd 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1815,9 +1815,8 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p return TSDB_CODE_SUCCESS; } -int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - char* p = colDataGetVarData(pInput->columnData, 0); - +int32_t qPseudoTagFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + char *p = colDataGetData(pInput->columnData, 0); int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true); if (code) { return code; @@ -1826,31 +1825,6 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO pOutput->numOfRows += pInput->numOfRows; return TSDB_CODE_SUCCESS; } -#ifdef BUILD_NO_CALL -int32_t qTbUidFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - char* p = colDataGetNumData(pInput->columnData, 0); - - int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true); - if (code) { - return code; - } - - pOutput->numOfRows += pInput->numOfRows; - return TSDB_CODE_SUCCESS; -} - -int32_t qVgIdFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - char* p = colDataGetNumData(pInput->columnData, 0); - - int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true); - if (code) { - return code; - } - - pOutput->numOfRows += pInput->numOfRows; - return TSDB_CODE_SUCCESS; -} -#endif /** Aggregation functions **/ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 0534f2c30c..fd9c8b61d1 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -123,7 +123,7 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t STaskId streamTaskGetTaskId(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); -int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); +int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask); void streamClearChkptReadyMsg(SStreamTask* pTask); int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b0170d5083..7fb8095acd 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -388,10 +388,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, p, pStreamTask->status.schedStatus); - pTimeWindow->skey = INT64_MIN; - qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); - stDebug("s-task:%s after exceed the threshold:%" PRId64 " and then update the window filter", - pStreamTask->id.idStr, pStreamTask->dataRange.range.maxVer); + streamTaskResetTimewindowFilter(pStreamTask); } else { stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr); } @@ -420,10 +417,6 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ASSERT(pTask->status.appendTranstateBlock == 1); int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SOURCE) { - streamTaskFillHistoryFinished(pTask); - } - if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); } else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5e53a921b9..db74ce9897 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -590,11 +590,6 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return 0; } - if (pTask->info.fillHistory == 1) { - stDebug("s-task:0x%x initial nextProcessVer is set to 1 for fill-history task", pTask->id.taskId); - ver = 1; - } - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); return -1; diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 2f5bca8ed9..ee98bc801b 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -595,8 +595,6 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) SDataRange* pRange = &pHTask->dataRange; // the query version range should be limited to the already processed data - pRange->range.minVer = 0; - pRange->range.maxVer = pTask->chkInfo.nextProcessVer - 1; pHTask->execInfo.init = taosGetTimestampMs(); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -843,7 +841,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } } -int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) { +int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; return qStreamInfoResetTimewindowFilter(exec); } @@ -854,8 +852,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe int64_t walScanStartVer = pRange->maxVer + 1; if (walScanStartVer > nextProcessVer - 1) { - // no input data yet. no need to execute the secondary scan while stream task halt - streamTaskFillHistoryFinished(pTask); stDebug( "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, " "related stream task currentVer:%" PRId64, @@ -965,25 +961,12 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { return; } - int64_t ekey = 0; - if (pRange->window.ekey < INT64_MAX) { - ekey = pRange->window.ekey + 1; - } else { - ekey = pRange->window.ekey; - } - - int64_t ver = pRange->range.minVer; - - pRange->window.skey = ekey; - pRange->window.ekey = INT64_MAX; - pRange->range.minVer = 0; - pRange->range.maxVer = ver; - - stDebug("s-task:%s level:%d related fill-history task exists, set stream task timeWindow:%" PRId64 " - %" PRId64 + stDebug("s-task:%s level:%d related fill-history task exists, stream task timeWindow:%" PRId64 " - %" PRId64 ", verRang:%" PRId64 " - %" PRId64, - pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, ver, INT64_MAX); + pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, + pRange->range.maxVer); - SVersionRange verRange = {.minVer = ver, .maxVer = INT64_MAX}; + SVersionRange verRange = pRange->range; STimeWindow win = pRange->window; streamSetParamForStreamScannerStep2(pTask, &verRange, &win); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 3018894132..fef733c9f3 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -79,7 +79,7 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { return pEpInfo; } -SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam, +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, SArray* pTaskList, bool hasFillhistory) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { @@ -92,6 +92,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory pTask->ver = SSTREAM_TASK_VER; pTask->id.taskId = tGenIdPI32(); pTask->id.streamId = streamId; + pTask->info.taskLevel = taskLevel; pTask->info.fillHistory = fillHistory; pTask->info.triggerParam = triggerParam; @@ -115,6 +116,8 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory ASSERT(hasFillhistory); } + epsetAssign(&(pTask->info.mnodeEpset), pEpset); + addToTaskset(pTaskList, pTask); return pTask; } @@ -458,16 +461,18 @@ void tFreeStreamTask(SStreamTask* pTask) { int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; - pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; - pTask->status.timerActive = 0; + + pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; + pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; pTask->inputq.queue = streamQueueOpen(512 << 10); pTask->outputq.queue = streamQueueOpen(512 << 10); - if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) { stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr); return TSDB_CODE_OUT_OF_MEMORY; } + pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; + pTask->status.timerActive = 0; pTask->status.pSM = streamCreateStateMachine(pTask); if (pTask->status.pSM == NULL) { stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr, @@ -476,29 +481,34 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } pTask->execInfo.created = taosGetTimestampMs(); - pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; - pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; - pTask->pMeta = pMeta; + SCheckpointInfo* pChkInfo = &pTask->chkInfo; + SDataRange* pRange = &pTask->dataRange; - pTask->chkInfo.checkpointVer = ver - 1; // only update when generating checkpoint - pTask->chkInfo.processedVer = ver - 1; // already processed version + // only set the version info for stream tasks without fill-history task + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) { + pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint + pChkInfo->processedVer = ver - 1; // already processed version + pChkInfo->nextProcessVer = ver; // next processed version - pTask->chkInfo.nextProcessVer = ver; // next processed version - pTask->dataRange.range.maxVer = ver; - pTask->dataRange.range.minVer = ver; - pTask->pMsgCb = pMsgCb; - pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); - - pTask->outputInfo.pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket)); - if (pTask->outputInfo.pTokenBucket == NULL) { - stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return TSDB_CODE_OUT_OF_MEMORY; + pRange->range.maxVer = ver; + pRange->range.minVer = ver; + } else { + if (pTask->info.fillHistory == 1) { + pChkInfo->checkpointVer = pRange->range.maxVer; + pChkInfo->processedVer = pRange->range.maxVer; + pChkInfo->nextProcessVer = pRange->range.maxVer + 1; + } else { + pChkInfo->checkpointVer = pRange->range.minVer - 1; + pChkInfo->processedVer = pRange->range.minVer - 1; + pChkInfo->nextProcessVer = pRange->range.minVer; + } + } } - // 2MiB per second for sink task - // 50 times sink operator per second - streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); + pTask->pMeta = pMeta; + pTask->pMsgCb = pMsgCb; + pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr); @@ -514,10 +524,22 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } taosThreadMutexInit(&pTask->lock, &attr); + taosThreadMutexAttrDestroy(&attr); streamTaskOpenAllUpstreamInput(pTask); - pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); - if (pTask->outputInfo.pDownstreamUpdateList == NULL) { + STaskOutputInfo* pOutputInfo = &pTask->outputInfo; + pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket)); + if (pOutputInfo->pTokenBucket == NULL) { + stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + + // 2MiB per second for sink task + // 50 times sink operator per second + streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); + pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); + if (pOutputInfo->pDownstreamUpdateList == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -527,16 +549,16 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) { return 0; + } + + int32_t type = pTask->outputInfo.type; + if (type == TASK_OUTPUT__TABLE) { + return 0; + } else if (type == TASK_OUTPUT__FIXED_DISPATCH) { + return 1; } else { - int32_t type = pTask->outputInfo.type; - if (type == TASK_OUTPUT__TABLE) { - return 0; - } else if (type == TASK_OUTPUT__FIXED_DISPATCH) { - return 1; - } else { - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - return taosArrayGetSize(vgInfo); - } + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; + return taosArrayGetSize(vgInfo); } } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 1671d78ed2..83e71c42bc 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -105,7 +105,7 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader); if (pTask->hTaskInfo.haltVer == -1) { - pTask->hTaskInfo.haltVer = pTask->dataRange.range.maxVer + 1; + pTask->hTaskInfo.haltVer = pTask->dataRange.range.minVer; } }