enh: rsma logic

This commit is contained in:
kailixu 2023-11-02 11:03:18 +08:00
parent 9649e87cab
commit 76536e1c82
2 changed files with 58 additions and 107 deletions

View File

@ -44,7 +44,7 @@ static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static void tdFreeRSmaSubmitItems(SArray *pItems); static void tdFreeRSmaSubmitItems(SArray *pItems);
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid); int64_t suid, SArray **ppResList, int8_t *streamFlushed);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaFetchTrigger(void *param, void *tmrId);
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
@ -591,16 +591,24 @@ _end:
} }
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid) { int64_t suid, SArray **ppResList, int8_t *streamFlushed) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SSDataBlock *output = NULL; SSDataBlock *output = NULL;
SArray *pResList = NULL;
SArray *pResList = taosArrayInit(1, POINTER_BYTES); if (!(*ppResList)) {
pResList = taosArrayInit(1, POINTER_BYTES);
if (pResList == NULL) { if (pResList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
*ppResList = pResList;
} else {
pResList = *ppResList;
}
taosArrayClear(pResList);
while (1) { while (1) {
uint64_t ts; uint64_t ts;
@ -622,6 +630,10 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
#endif #endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
output = taosArrayGetP(pResList, i); output = taosArrayGetP(pResList, i);
if(output->info.type == STREAM_CHECKPOINT) {
if (streamFlushed) *streamFlushed = 1;
continue;
}
smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma), smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma),
output->info.id.uid, output->info.id.groupId, output->info.rows); output->info.id.uid, output->info.id.groupId, output->info.rows);
@ -659,7 +671,6 @@ _exit:
} else { } else {
smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level); smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level);
} }
taosArrayDestroy(pResList);
qCleanExecTaskBlockBuf(taskInfo); qCleanExecTaskBlockBuf(taskInfo);
return code; return code;
} }
@ -756,6 +767,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
ERsmaExecType type, int8_t level) { ERsmaExecType type, int8_t level) {
int32_t idx = level - 1; int32_t idx = level - 1;
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
SArray *pResList = NULL;
if (!qTaskInfo) { if (!qTaskInfo) {
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
@ -784,8 +796,9 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
} }
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid, &pResList, NULL);
taosArrayDestroy(pResList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1062,90 +1075,6 @@ _err:
return code; return code;
} }
static int32_t tdRSmaExecVerifyCheckPoint(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid, SArray **ppResList, int8_t *streamFlushed) {
int32_t code = 0;
int32_t lino = 0;
SSDataBlock *output = NULL;
SArray *pResList = NULL;
if (!(*ppResList)) {
pResList = taosArrayInit(1, POINTER_BYTES);
if (pResList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*ppResList = pResList;
} else {
pResList = *ppResList;
}
while (1) {
uint64_t ts;
bool hasMore = false;
code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
if (code == TSDB_CODE_QRY_IN_EXEC) {
code = 0;
break;
}
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pResList) == 0) {
break;
}
#if 0
char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowDataBlocks(pResList, flag);
#endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
output = taosArrayGetP(pResList, i);
if(output->info.type == STREAM_CHECKPOINT) {
if (streamFlushed) *streamFlushed = 1;
continue;
}
smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma),
output->info.id.uid, output->info.id.groupId, output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq2 *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) {
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version);
if (pReq) {
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
}
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64
", ver:%" PRIi64,
SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
output ? output->info.version : -1);
} else {
smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level);
}
taosArrayDestroy(pResList);
qCleanExecTaskBlockBuf(taskInfo);
return code;
}
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -1177,6 +1106,8 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
} }
// stream state: process checkpoint response in async mode // stream state: process checkpoint response in async mode
int32_t nStreamFlushed = 0; int32_t nStreamFlushed = 0;
int32_t nMSleep = 0;
while (true) {
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
@ -1185,15 +1116,22 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) { if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
int8_t streamFlushed = 0; int8_t streamFlushed = 0;
code = tdRSmaExecVerifyCheckPoint(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema, code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema,
pRSmaInfo->suid, &pResList, &streamFlushed); pRSmaInfo->suid, &pResList, &streamFlushed);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) { if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) {
smaInfo("%s:%d checkpoint ready, %d ms consumed, received/total: %d/%d", __func__, __LINE__, nMSleep,
nStreamFlushed, nTaskInfo);
goto _checkpoint; goto _checkpoint;
} }
} }
} }
} }
taosMsleep(1);
++nMSleep;
smaInfo("%s:%d wait for checkpoint ready, %d ms elapsed, received/total: %d/%d", __func__, __LINE__, nMSleep,
nStreamFlushed, nTaskInfo);
}
// stream state: build checkpoint in backend // stream state: build checkpoint in backend
_checkpoint: _checkpoint:
@ -1207,7 +1145,6 @@ _checkpoint:
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
if (pItem && pItem->pStreamTask) { if (pItem && pItem->pStreamTask) {
SStreamTask *pTask = pItem->pStreamTask; SStreamTask *pTask = pItem->pStreamTask;
// adaption for API streamTaskBuildCheckpoint
atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
pTask->checkpointingId = taosGetTimestampNs(); pTask->checkpointingId = taosGetTimestampNs();
code = streamTaskBuildCheckpoint(pTask); code = streamTaskBuildCheckpoint(pTask);
@ -1224,6 +1161,7 @@ _checkpoint:
} }
_exit: _exit:
taosArrayDestroy(pResList);
if (code) { if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} }
@ -1355,6 +1293,7 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) {
*/ */
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
SArray *pResList = NULL;
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
if (pItem->fetchLevel) { if (pItem->fetchLevel) {
@ -1385,7 +1324,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err; goto _err;
} }
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, &pResList, NULL) < 0) {
goto _err; goto _err;
} }
@ -1399,8 +1338,10 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
} }
_end: _end:
taosArrayDestroy(pResList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
taosArrayDestroy(pResList);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }

View File

@ -1967,9 +1967,19 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
} }
for(int32_t i=0; i< taosArrayGetSize(pTableList->pTableList); ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, i);
if(pKeyInfo->uid == uid) {
assert(0);
}
}
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
taosArrayPush(pTableList->pTableList, &keyInfo); taosArrayPush(pTableList->pTableList, &keyInfo);
if(taosHashGet(pTableList->map, &uid, sizeof(uid))) {
assert(0);
}
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1; int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot)); taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));