diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8e00297564..d2694c860d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -44,7 +44,7 @@ static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdFreeRSmaSubmitItems(SArray *pItems); static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid); + int64_t suid, SArray **ppResList, int8_t *streamFlushed); static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); @@ -591,17 +591,25 @@ _end: } static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid) { + int64_t suid, SArray **ppResList, int8_t *streamFlushed) { int32_t code = 0; int32_t lino = 0; SSDataBlock *output = NULL; + SArray *pResList = NULL; - SArray *pResList = taosArrayInit(1, POINTER_BYTES); - if (pResList == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); + if (!(*ppResList)) { + pResList = taosArrayInit(1, POINTER_BYTES); + if (pResList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *ppResList = pResList; + } else { + pResList = *ppResList; } + taosArrayClear(pResList); + while (1) { uint64_t ts; bool hasMore = false; @@ -622,6 +630,10 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma #endif for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { output = taosArrayGetP(pResList, i); + if(output->info.type == STREAM_CHECKPOINT) { + if (streamFlushed) *streamFlushed = 1; + continue; + } smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma), output->info.id.uid, output->info.id.groupId, output->info.rows); @@ -659,7 +671,6 @@ _exit: } else { smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level); } - taosArrayDestroy(pResList); qCleanExecTaskBlockBuf(taskInfo); return code; } @@ -756,6 +767,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, ERsmaExecType type, int8_t level) { int32_t idx = level - 1; void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); + SArray *pResList = NULL; if (!qTaskInfo) { smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, @@ -784,8 +796,9 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, } SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); + tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid, &pResList, NULL); + taosArrayDestroy(pResList); return TSDB_CODE_SUCCESS; } @@ -1062,90 +1075,6 @@ _err: return code; } -static int32_t tdRSmaExecVerifyCheckPoint(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid, SArray **ppResList, int8_t *streamFlushed) { - int32_t code = 0; - int32_t lino = 0; - SSDataBlock *output = NULL; - SArray *pResList = NULL; - - if (!(*ppResList)) { - pResList = taosArrayInit(1, POINTER_BYTES); - if (pResList == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - *ppResList = pResList; - } else { - pResList = *ppResList; - } - - while (1) { - uint64_t ts; - bool hasMore = false; - code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL); - if (code == TSDB_CODE_QRY_IN_EXEC) { - code = 0; - break; - } - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayGetSize(pResList) == 0) { - break; - } -#if 0 - char flag[10] = {0}; - snprintf(flag, 10, "level %" PRIi8, pItem->level); - blockDebugShowDataBlocks(pResList, flag); -#endif - for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { - output = taosArrayGetP(pResList, i); - if(output->info.type == STREAM_CHECKPOINT) { - if (streamFlushed) *streamFlushed = 1; - continue; - } - smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma), - output->info.id.uid, output->info.id.groupId, output->info.rows); - - STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); - SSubmitReq2 *pReq = NULL; - - // TODO: the schema update should be handled later(TD-17965) - if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) { - code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { - code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; - tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); - taosMemoryFree(pReq); - TSDB_CHECK_CODE(code, lino, _exit); - } - - smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64, - SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version); - - if (pReq) { - tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); - taosMemoryFree(pReq); - } - } - } -_exit: - if (code) { - smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64 - ", ver:%" PRIi64, - SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1, - output ? output->info.version : -1); - } else { - smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level); - } - taosArrayDestroy(pResList); - qCleanExecTaskBlockBuf(taskInfo); - return code; -} - int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t code = 0; int32_t lino = 0; @@ -1177,22 +1106,31 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } // stream state: process checkpoint response in async mode int32_t nStreamFlushed = 0; - while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; - if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - continue; - } - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) { - int8_t streamFlushed = 0; - code = tdRSmaExecVerifyCheckPoint(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema, - pRSmaInfo->suid, &pResList, &streamFlushed); - TSDB_CHECK_CODE(code, lino, _exit); - if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) { - goto _checkpoint; + int32_t nMSleep = 0; + while (true) { + while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + continue; + } + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) { + int8_t streamFlushed = 0; + code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema, + pRSmaInfo->suid, &pResList, &streamFlushed); + TSDB_CHECK_CODE(code, lino, _exit); + if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) { + smaInfo("%s:%d checkpoint ready, %d ms consumed, received/total: %d/%d", __func__, __LINE__, nMSleep, + nStreamFlushed, nTaskInfo); + goto _checkpoint; + } } } } + taosMsleep(1); + ++nMSleep; + smaInfo("%s:%d wait for checkpoint ready, %d ms elapsed, received/total: %d/%d", __func__, __LINE__, nMSleep, + nStreamFlushed, nTaskInfo); } // stream state: build checkpoint in backend @@ -1207,7 +1145,6 @@ _checkpoint: SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); if (pItem && pItem->pStreamTask) { SStreamTask *pTask = pItem->pStreamTask; - // adaption for API streamTaskBuildCheckpoint atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); pTask->checkpointingId = taosGetTimestampNs(); code = streamTaskBuildCheckpoint(pTask); @@ -1224,6 +1161,7 @@ _checkpoint: } _exit: + taosArrayDestroy(pResList); if (code) { smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } @@ -1355,6 +1293,7 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) { */ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; + SArray *pResList = NULL; for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); if (pItem->fetchLevel) { @@ -1385,7 +1324,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } - if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { + if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, &pResList, NULL) < 0) { goto _err; } @@ -1399,8 +1338,10 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { } _end: + taosArrayDestroy(pResList); return TSDB_CODE_SUCCESS; _err: + taosArrayDestroy(pResList); return TSDB_CODE_FAILED; } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 753d3e680c..a1bd5a7483 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1967,9 +1967,19 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); } + for(int32_t i=0; i< taosArrayGetSize(pTableList->pTableList); ++i) { + STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, i); + if(pKeyInfo->uid == uid) { + assert(0); + } + } + STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; taosArrayPush(pTableList->pTableList, &keyInfo); + if(taosHashGet(pTableList->map, &uid, sizeof(uid))) { + assert(0); + } int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1; taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));