merge main

This commit is contained in:
kailixu 2023-05-12 13:55:22 +08:00
commit 0c9aa5bf3f
5 changed files with 55 additions and 83 deletions

View File

@ -755,6 +755,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_FS_REF TAOS_DEF_ERROR_CODE(0, 0x3161) #define TSDB_CODE_RSMA_FS_REF TAOS_DEF_ERROR_CODE(0, 0x3161)
#define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3162) #define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3162)
#define TSDB_CODE_RSMA_FS_UPDATE TAOS_DEF_ERROR_CODE(0, 0x3163) #define TSDB_CODE_RSMA_FS_UPDATE TAOS_DEF_ERROR_CODE(0, 0x3163)
#define TSDB_CODE_RSMA_RESULT TAOS_DEF_ERROR_CODE(0, 0x3164)
//index //index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)

View File

@ -30,6 +30,8 @@ SSmaMgmt smaMgmt = {
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now);
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static void tdUidStoreDestory(STbUidStore *pStore); static void tdUidStoreDestory(STbUidStore *pStore);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
@ -44,7 +46,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid); int64_t suid);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo); // static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer); static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
@ -64,10 +66,7 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level); smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
qDestroyTask(otaskHandle); qDestroyTask(otaskHandle);
} else {
smaDebug("vgId:%d, not free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
} }
// TODO: clear files related to qTaskInfo?
} }
/** /**
@ -95,16 +94,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if (isDeepFree && pInfo->taskInfo[i]) { if (isDeepFree && pInfo->taskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
} else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
pInfo->suid, i + 1);
} }
#if 0 #if 0
if (pInfo->iTaskInfo[i]) { if (pInfo->iTaskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1); tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1);
} else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo",
SMA_VID(pSma), pInfo->suid, i + 1);
} }
#endif #endif
} }
@ -140,11 +133,6 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
} }
static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) { static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
if (ASSERTS(*pStore == NULL, "*pStore:%p != NULL", *pStore)) {
terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_FAILED;
}
*pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); *pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
if (*pStore == NULL) { if (*pStore == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -314,11 +302,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
if (ASSERTS(pItem->level > 0, "pItem level:%" PRIi8 " should > 0", pItem->level)) {
terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_FAILED;
}
SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid}; SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)); taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef));
@ -380,13 +363,13 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->pTSchema = pTSchema;
pRSmaInfo->suid = suid; pRSmaInfo->suid = suid;
T_REF_INIT_VAL(pRSmaInfo, 1); T_REF_INIT_VAL(pRSmaInfo, 1);
if (!(pRSmaInfo->queue = taosOpenQueue())) {
if (!(pRSmaInfo->queue = taosOpenQueue()) || !(pRSmaInfo->qall = taosAllocateQall()) ||
tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0 ||
tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) {
goto _err; goto _err;
} }
if (!(pRSmaInfo->qall = taosAllocateQall())) {
goto _err;
}
#if 0 #if 0
if (!(pRSmaInfo->iQueue = taosOpenQueue())) { if (!(pRSmaInfo->iQueue = taosOpenQueue())) {
goto _err; goto _err;
@ -395,13 +378,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto _err; goto _err;
} }
#endif #endif
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
goto _err;
}
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) {
goto _err;
}
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) { if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
goto _err; goto _err;
@ -577,15 +553,12 @@ void *tdUidStoreFree(STbUidStore *pStore) {
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
if (!pReq) { if (pReq) {
terrno = TSDB_CODE_INVALID_PTR; SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq;
return TSDB_CODE_FAILED; // spin lock for race condition during insert data
} if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
return TSDB_CODE_FAILED;
SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq; }
// spin lock for race condition during insert data
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
return TSDB_CODE_FAILED;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -607,7 +580,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
return 0; return 0;
} }
#if 0
/** /**
* @brief retention of rsma1/rsma2 * @brief retention of rsma1/rsma2
* *
@ -631,48 +603,40 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) {
_end: _end:
return code; return code;
} }
#endif
#if 0
static void tdBlockDataDestroy(SArray *pBlockArr) { static void tdBlockDataDestroy(SArray *pBlockArr) {
for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) {
blockDataDestroy(taosArrayGetP(pBlockArr, i)); blockDataDestroy(taosArrayGetP(pBlockArr, i));
} }
taosArrayDestroy(pBlockArr); taosArrayDestroy(pBlockArr);
} }
#endif
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid) { int64_t suid) {
int32_t code = 0;
int32_t lino = 0;
SSDataBlock *output = NULL;
SArray *pResList = taosArrayInit(1, POINTER_BYTES); SArray *pResList = taosArrayInit(1, POINTER_BYTES);
if (pResList == NULL) { if (pResList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
while (1) { while (1) {
uint64_t ts; uint64_t ts;
bool hasMore = false; bool hasMore = false;
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL); code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) {
if (code == TSDB_CODE_QRY_IN_EXEC) { code = 0;
break; break;
} else {
smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid,
pItem->level, terrstr(code));
goto _err;
}
} }
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pResList) == 0) { if (taosArrayGetSize(pResList) == 0) {
if (terrno == 0) {
// smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
} else {
smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr());
goto _err;
}
break; break;
} else {
smaDebug("vgId:%d, rsma level %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
} }
#if 0 #if 0
char flag[10] = {0}; char flag[10] = {0};
@ -680,28 +644,25 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
blockDebugShowDataBlocks(pResList, flag); blockDebugShowDataBlocks(pResList, flag);
#endif #endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
SSDataBlock *output = taosArrayGetP(pResList, i);
smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRId64, output->info.id.uid, output = taosArrayGetP(pResList, i);
output->info.id.groupId, output->info.rows); smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", SMA_VID(pSma),
output->info.id.uid, output->info.id.groupId, output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq2 *pReq = NULL; SSubmitReq2 *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965) // TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) { if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%" PRIu64 ", level %" PRIi8 code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
" failed since %s", TSDB_CHECK_CODE(code, lino, _exit);
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr());
goto _err;
} }
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq); taosMemoryFree(pReq);
smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8 TSDB_CHECK_CODE(code, lino, _exit);
" failed since %s",
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr());
goto _err;
} }
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64, smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
@ -713,15 +674,18 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
} }
} }
} }
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64
", ver:%" PRIi64,
SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
output ? output->info.version : -1);
} else {
smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level);
}
taosArrayDestroy(pResList); taosArrayDestroy(pResList);
qCleanExecTaskBlockBuf(taskInfo); qCleanExecTaskBlockBuf(taskInfo);
return TSDB_CODE_SUCCESS; return code;
_err:
taosArrayDestroy(pResList);
qCleanExecTaskBlockBuf(taskInfo);
return TSDB_CODE_FAILED;
} }
/** /**
@ -910,6 +874,7 @@ _exit:
* @param pInfo * @param pInfo
* @return int32_t * @return int32_t
*/ */
#if 0
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -961,6 +926,7 @@ _exit:
metaReaderClear(&mr); metaReaderClear(&mr);
return code; return code;
} }
#endif
/** /**
* @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied. * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
@ -996,12 +962,14 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL; return NULL;
} }
#if 0
if (!pRSmaInfo->taskInfo[0]) { if (!pRSmaInfo->taskInfo[0]) {
if ((terrno = tdRSmaInfoClone(pSma, pRSmaInfo)) < 0) { if ((terrno = tdRSmaInfoClone(pSma, pRSmaInfo)) < 0) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL; return NULL;
} }
} }
#endif
tdRefRSmaInfo(pSma, pRSmaInfo); tdRefRSmaInfo(pSma, pRSmaInfo);
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) { if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) {

View File

@ -82,6 +82,9 @@ static int32_t vnodeRetentionTask(void *param) {
code = tsdbDoRetention(pInfo->pVnode->pTsdb, pInfo->now); code = tsdbDoRetention(pInfo->pVnode->pTsdb, pInfo->now);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = smaDoRetention(pInfo->pVnode->pSma, pInfo->now);
TSDB_CHECK_CODE(code, lino, _exit);
// commit info // commit info
vnodeCommitInfo(dir); vnodeCommitInfo(dir);

View File

@ -624,6 +624,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state c
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_REF, "Rsma fs ref error") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_REF, "Rsma fs ref error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_SYNC, "Rsma fs sync error") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_SYNC, "Rsma fs sync error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_UPDATE, "Rsma fs update error") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_UPDATE, "Rsma fs update error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_RESULT, "Rsma result error")
//index //index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")

View File

@ -18,8 +18,7 @@ if $rows != 2 then
endi endi
print =============== create child table print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang"); sql create table ct1 using stb tags("BeiJing", "ChaoYang") ct_1 using stb1 tags("BeiJing", "ChaoYang");
sql create table ct_1 using stb1 tags("BeiJing", "ChaoYang");
sql show tables sql show tables
if $rows != 2 then if $rows != 2 then