feat: rollup data submit
This commit is contained in:
parent
2a042daafa
commit
c7ca57f557
|
@ -79,11 +79,11 @@ typedef enum {
|
|||
} ETsdbSmaType;
|
||||
|
||||
typedef enum {
|
||||
TSDB_RSMA_RETENTION_0 = 0,
|
||||
TSDB_RSMA_RETENTION_1 = 1,
|
||||
TSDB_RSMA_RETENTION_2 = 2,
|
||||
TSDB_RSMA_RETENTION_MAX = 3
|
||||
} ERSmaRetention;
|
||||
TSDB_RETENTION_L0 = 0,
|
||||
TSDB_RETENTION_L1 = 1,
|
||||
TSDB_RETENTION_L2 = 2,
|
||||
TSDB_RETENTION_MAX = 3
|
||||
} ERetentionLevel;
|
||||
|
||||
extern char *qtypeStr[];
|
||||
|
||||
|
|
|
@ -225,6 +225,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
|
|||
|
||||
void blockDebugShowData(const SArray* dataBlocks);
|
||||
|
||||
void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||
tb_uid_t uid, tb_uid_t suid);
|
||||
|
||||
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
|
||||
}
|
||||
|
|
|
@ -202,6 +202,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
|
||||
|
||||
// sync integration
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL)
|
||||
|
|
|
@ -453,7 +453,6 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
|
|||
// all fit in
|
||||
*stopIndex = numOfRows - 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
}
|
||||
|
||||
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) {
|
||||
|
@ -940,8 +939,9 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
|||
copyBackToBlock(pDataBlock, pCols);
|
||||
int64_t p4 = taosGetTimestampUs();
|
||||
|
||||
uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1,
|
||||
p3 - p2, p4 - p3, rows);
|
||||
uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64
|
||||
", rows:%d\n",
|
||||
p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
|
||||
destroyTupleIndex(index);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1464,3 +1464,122 @@ void blockDebugShowData(const SArray* dataBlocks) {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief TODO: Assume that the final generated result it less than 3M
|
||||
*
|
||||
* @param pReq
|
||||
* @param pDataBlocks
|
||||
* @param vgId
|
||||
* @param uid set as parameter temporarily // TODO: remove this parameter, and the executor should set uid in
|
||||
* SDataBlock->info.uid
|
||||
* @param suid // TODO: check with Liao whether suid response is reasonable
|
||||
*
|
||||
* TODO: colId should be set
|
||||
*/
|
||||
void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema *pTSchema, int32_t vgId, tb_uid_t uid,
|
||||
tb_uid_t suid) {
|
||||
int32_t sz = taosArrayGetSize(pDataBlocks);
|
||||
int32_t bufSize = sizeof(SSubmitReq);
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info;
|
||||
bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(pBlkInfo->numOfCols));
|
||||
bufSize += sizeof(SSubmitBlk);
|
||||
}
|
||||
|
||||
ASSERT(bufSize < 3 * 1024 * 1024);
|
||||
|
||||
*pReq = taosMemoryCalloc(1, bufSize);
|
||||
void* pDataBuf = *pReq;
|
||||
|
||||
int32_t msgLen = sizeof(SSubmitReq);
|
||||
int32_t numOfBlks = 0;
|
||||
SRowBuilder rb = {0};
|
||||
tdSRowInit(&rb, 0); // TODO: use the latest version
|
||||
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
|
||||
int32_t colNum = pDataBlock->info.numOfCols;
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t rowSize = pDataBlock->info.rowSize;
|
||||
int64_t groupId = pDataBlock->info.groupId;
|
||||
|
||||
if(rb.nCols != colNum) {
|
||||
tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
|
||||
}
|
||||
|
||||
SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen);
|
||||
pSubmitBlk->suid = suid;
|
||||
pSubmitBlk->uid = uid;
|
||||
pSubmitBlk->numOfRows = rows;
|
||||
|
||||
++numOfBlks;
|
||||
|
||||
msgLen += sizeof(SSubmitBlk);
|
||||
int32_t dataLen = 0;
|
||||
for (int32_t j = 0; j < rows; ++j) { // iterate by row
|
||||
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
|
||||
printf("|");
|
||||
bool isStartKey = false;
|
||||
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||
switch (pColInfoData->info.type) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
if (!isStartKey) {
|
||||
isStartKey = true;
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 0, 0);
|
||||
} else {
|
||||
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 8, k);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, 8, k);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
|
||||
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, 8, k);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||
printf("the column type %" PRIi16 " is defined but not implemented yet\n", pColInfoData->info.type);
|
||||
TASSERT(0);
|
||||
break;
|
||||
default:
|
||||
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
|
||||
tdAppendColValToRow(&rb, 2, pColInfoData->info.type, TD_VTYPE_NORM, var, true, 8, k);
|
||||
} else {
|
||||
printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
|
||||
TASSERT(0);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
dataLen += TD_ROW_LEN(rb.pBuf);
|
||||
}
|
||||
pSubmitBlk->dataLen = dataLen;
|
||||
msgLen += pSubmitBlk->dataLen;
|
||||
}
|
||||
|
||||
(*pReq)->length = msgLen;
|
||||
|
||||
(*pReq)->header.vgId = htonl(vgId);
|
||||
(*pReq)->header.contLen = htonl(msgLen);
|
||||
(*pReq)->length = (*pReq)->header.contLen;
|
||||
(*pReq)->numOfBlocks = htonl(numOfBlks);
|
||||
SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
|
||||
while (numOfBlks--) {
|
||||
int32_t dataLen = blk->dataLen;
|
||||
blk->uid = htobe64(blk->uid);
|
||||
blk->suid = htobe64(blk->suid);
|
||||
blk->padding = htonl(blk->padding);
|
||||
blk->sversion = htonl(blk->sversion);
|
||||
blk->dataLen = htonl(blk->dataLen);
|
||||
blk->schemaLen = htonl(blk->schemaLen);
|
||||
blk->numOfRows = htons(blk->numOfRows);
|
||||
blk = (SSubmitBlk*)(blk->data + dataLen);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -284,6 +284,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
|||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT_RSMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
|
|
|
@ -37,7 +37,7 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#define TSDB_VNODE_SMA_DEBUG // TODO: evaluate to remove the macro and the relative codes
|
||||
|
||||
// vnode
|
||||
typedef struct SVnode SVnode;
|
||||
typedef struct STsdbCfg STsdbCfg; // todo: remove
|
||||
|
@ -145,7 +145,7 @@ struct STsdbCfg {
|
|||
int32_t keep2;
|
||||
// TODO: save to tsdb cfg file
|
||||
int8_t type; // ETsdbType
|
||||
SRetention retentions[TSDB_RSMA_RETENTION_MAX];
|
||||
SRetention retentions[TSDB_RETENTION_MAX];
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
|
|
|
@ -72,6 +72,7 @@ struct STsdb {
|
|||
char *path;
|
||||
SVnode *pVnode;
|
||||
bool repoLocked;
|
||||
int8_t level; // retention level
|
||||
TdThreadMutex mutex;
|
||||
STsdbMemTable *mem;
|
||||
STsdbMemTable *imem;
|
||||
|
|
|
@ -124,6 +124,7 @@ int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore);
|
|||
void tsdbUidStoreDestory(STbUidStore* pStore);
|
||||
void* tsdbUidStoreFree(STbUidStore* pStore);
|
||||
int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType);
|
||||
int32_t tsdbProcessSubmitReq(STsdb* pTsdb, int64_t version, void* pReq);
|
||||
|
||||
typedef struct {
|
||||
int8_t streamType; // sma or other
|
||||
|
@ -181,6 +182,7 @@ struct SVnode {
|
|||
|
||||
struct STbUidStore {
|
||||
tb_uid_t suid;
|
||||
tb_uid_t uid; // TODO: just for debugging, remove when uid provided in SSDataBlock
|
||||
SArray* tbUids;
|
||||
SHashObj* uidHash;
|
||||
};
|
||||
|
|
|
@ -27,7 +27,13 @@ static const char *TSDB_FNAME_SUFFIX[] = {
|
|||
"rsma", // TSDB_FILE_RSMA
|
||||
};
|
||||
|
||||
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
|
||||
static const char *TSDB_DIR_NAME[] = {
|
||||
"tsdb",
|
||||
"rsma1",
|
||||
"rsma2",
|
||||
};
|
||||
|
||||
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char* dname, char *fname);
|
||||
// static int tsdbRollBackMFile(SMFile *pMFile);
|
||||
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo);
|
||||
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo);
|
||||
|
@ -45,7 +51,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t
|
|||
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||
pDFile->info.fver = tsdbGetDFSVersion(ftype);
|
||||
|
||||
tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, fname);
|
||||
tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, TSDB_DIR_NAME[pRepo->level], fname);
|
||||
tfsInitFile(REPO_TFS(pRepo), &(pDFile->f), did, fname);
|
||||
}
|
||||
|
||||
|
@ -431,14 +437,15 @@ int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname) {
|
||||
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char *dname, char *fname) {
|
||||
ASSERT(ftype != TSDB_FILE_MAX);
|
||||
|
||||
if (ftype < TSDB_FILE_MAX) {
|
||||
if (ver == 0) {
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]);
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data/v%df%d.%s", vid, dname, vid, fid,
|
||||
TSDB_FNAME_SUFFIX[ftype]);
|
||||
} else {
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s-ver%" PRIu32, vid, vid, fid,
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data/v%df%d.%s-ver%" PRIu32, vid, dname, vid, fid,
|
||||
TSDB_FNAME_SUFFIX[ftype], ver);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -15,21 +15,21 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
|
||||
static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir);
|
||||
static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level);
|
||||
|
||||
int tsdbOpen(SVnode *pVnode, int8_t type) {
|
||||
switch (type) {
|
||||
case TSDB_TYPE_TSDB:
|
||||
return tsdbOpenImpl(pVnode, type, &VND_TSDB(pVnode), VNODE_TSDB_DIR);
|
||||
return tsdbOpenImpl(pVnode, type, &VND_TSDB(pVnode), VNODE_TSDB_DIR, TSDB_RETENTION_L0);
|
||||
case TSDB_TYPE_TSMA:
|
||||
ASSERT(0);
|
||||
break;
|
||||
case TSDB_TYPE_RSMA_L0:
|
||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR);
|
||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR, TSDB_RETENTION_L0);
|
||||
case TSDB_TYPE_RSMA_L1:
|
||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR);
|
||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR, TSDB_RETENTION_L1);
|
||||
case TSDB_TYPE_RSMA_L2:
|
||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR);
|
||||
return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR, TSDB_RETENTION_L2);
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
|
@ -37,7 +37,17 @@ int tsdbOpen(SVnode *pVnode, int8_t type) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) {
|
||||
/**
|
||||
* @brief
|
||||
*
|
||||
* @param pVnode
|
||||
* @param type
|
||||
* @param ppTsdb
|
||||
* @param dir
|
||||
* @param level retention level
|
||||
* @return int
|
||||
*/
|
||||
int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) {
|
||||
STsdb *pTsdb = NULL;
|
||||
int slen = 0;
|
||||
|
||||
|
@ -55,6 +65,7 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) {
|
|||
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
|
||||
dir);
|
||||
pTsdb->pVnode = pVnode;
|
||||
pTsdb->level = level;
|
||||
pTsdb->repoLocked = false;
|
||||
taosThreadMutexInit(&pTsdb->mutex, NULL);
|
||||
pTsdb->fs = tsdbNewFS(REPO_CFG(pTsdb));
|
||||
|
@ -79,6 +90,7 @@ _err:
|
|||
|
||||
int tsdbClose(STsdb *pTsdb) {
|
||||
if (pTsdb) {
|
||||
// TODO: destroy mem/imem
|
||||
tsdbCloseFS(pTsdb);
|
||||
tsdbFreeFS(pTsdb->fs);
|
||||
taosMemoryFree(pTsdb);
|
||||
|
|
|
@ -105,7 +105,7 @@ typedef struct {
|
|||
|
||||
#define RSMA_TASK_INFO_HASH_SLOT 8
|
||||
struct SRSmaInfo {
|
||||
void *taskInfo[TSDB_RSMA_RETENTION_2]; // qTaskInfo_t
|
||||
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
|
||||
};
|
||||
|
||||
struct SSmaStat {
|
||||
|
@ -127,7 +127,7 @@ static FORCE_INLINE void tsdbFreeTaskHandle(qTaskInfo_t *taskHandle) {
|
|||
}
|
||||
|
||||
static FORCE_INLINE void *tsdbFreeRSmaInfo(SRSmaInfo *pInfo) {
|
||||
for (int32_t i = 0; i < TSDB_RSMA_RETENTION_MAX; ++i) {
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
||||
if (pInfo->taskInfo[i]) {
|
||||
tsdbFreeTaskHandle(pInfo->taskInfo[i]);
|
||||
}
|
||||
|
@ -175,7 +175,7 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg);
|
|||
static FORCE_INLINE int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
||||
static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids);
|
||||
static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType,
|
||||
qTaskInfo_t *taskInfo, tb_uid_t suid, int8_t retention);
|
||||
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, int8_t level);
|
||||
// mgmt interface
|
||||
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);
|
||||
|
||||
|
@ -1976,6 +1976,7 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
|||
|
||||
if (!pBlock) break;
|
||||
tsdbUidStorePut(pStore, msgIter.suid, NULL);
|
||||
pStore->uid = msgIter.uid; // TODO: remove, just for debugging
|
||||
}
|
||||
|
||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||
|
@ -1983,13 +1984,15 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
|||
}
|
||||
|
||||
static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType,
|
||||
qTaskInfo_t *taskInfo, tb_uid_t suid, int8_t retention) {
|
||||
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid,
|
||||
tb_uid_t uid, int8_t level) {
|
||||
SArray *pResult = NULL;
|
||||
tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), retention, taskInfo,
|
||||
tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo,
|
||||
suid);
|
||||
|
||||
qSetStreamInput(taskInfo, pMsg, inputType);
|
||||
while (1) {
|
||||
SSDataBlock *output;
|
||||
SSDataBlock *output = NULL;
|
||||
uint64_t ts;
|
||||
if (qExecTask(taskInfo, &output, &ts) < 0) {
|
||||
ASSERT(false);
|
||||
|
@ -2010,8 +2013,12 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg,
|
|||
|
||||
if (taosArrayGetSize(pResult) > 0) {
|
||||
blockDebugShowData(pResult);
|
||||
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pTsdb->pVnode->pRSma1 : pTsdb->pVnode->pRSma2);
|
||||
SSubmitReq *pReq = NULL;
|
||||
buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, TD_VID(pTsdb->pVnode),uid, suid);
|
||||
tsdbProcessSubmitReq(sinkTsdb, INT64_MAX, pReq);
|
||||
} else {
|
||||
tsdbWarn("vgId:%d no rsma_1 data generated since %s", REPO_ID(pTsdb), tstrerror(terrno));
|
||||
tsdbWarn("vgId:%d no rsma % " PRIi8 " data generated since %s", REPO_ID(pTsdb), level, tstrerror(terrno));
|
||||
}
|
||||
|
||||
taosArrayDestroy(pResult);
|
||||
|
@ -2019,13 +2026,15 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
||||
static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType, tb_uid_t suid, tb_uid_t uid) {
|
||||
SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb);
|
||||
if (!pEnv) {
|
||||
// only applicable when rsma env exists
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
ASSERT(uid != 0); // TODO: remove later
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||
SRSmaInfo *pRSmaInfo = NULL;
|
||||
|
||||
|
@ -2037,8 +2046,11 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType, tb_ui
|
|||
}
|
||||
|
||||
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
||||
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], suid, TSDB_RSMA_RETENTION_1);
|
||||
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], suid, TSDB_RSMA_RETENTION_2);
|
||||
// TODO: use the proper schema instead of 0, and cache STSchema in cache
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0);
|
||||
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1);
|
||||
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2);
|
||||
taosMemoryFree(pTSchema);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2056,12 +2068,12 @@ int32_t tsdbTriggerRSma(STsdb *pTsdb, void *pMsg, int32_t inputType) {
|
|||
tsdbFetchSubmitReqSuids(pMsg, &uidStore);
|
||||
|
||||
if (uidStore.suid != 0) {
|
||||
tsdbExecuteRSma(pTsdb, pMsg, inputType, uidStore.suid);
|
||||
tsdbExecuteRSma(pTsdb, pMsg, inputType, uidStore.suid, uidStore.uid);
|
||||
|
||||
void *pIter = taosHashIterate(uidStore.uidHash, NULL);
|
||||
while (pIter) {
|
||||
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||
tsdbExecuteRSma(pTsdb, pMsg, inputType, *pTbSuid);
|
||||
tsdbExecuteRSma(pTsdb, pMsg, inputType, *pTbSuid, 0);
|
||||
pIter = taosHashIterate(uidStore.uidHash, pIter);
|
||||
}
|
||||
|
||||
|
|
|
@ -66,7 +66,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|||
if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
||||
#ifdef TSDB_VNODE_SMA_DEBUG
|
||||
if (pCfg->tsdbCfg.retentions[0].freq > 0) {
|
||||
int32_t nRetention = 1;
|
||||
if (pCfg->tsdbCfg.retentions[1].freq > 0) {
|
||||
|
@ -87,7 +86,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|||
tjsonAddItemToArray(pNodeRetentions, pNodeRetention);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
|
||||
|
@ -135,11 +133,11 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
||||
#ifdef TSDB_VNODE_SMA_DEBUG
|
||||
SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
|
||||
int nRetention = tjsonGetArraySize(pNodeRetentions);
|
||||
ASSERT(nRetention <= TSDB_RSMA_RETENTION_MAX);
|
||||
|
||||
int32_t nRetention = tjsonGetArraySize(pNodeRetentions);
|
||||
if (nRetention > TSDB_RETENTION_MAX) {
|
||||
nRetention = TSDB_RETENTION_MAX;
|
||||
}
|
||||
for (int32_t i = 0; i < nRetention; ++i) {
|
||||
SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i);
|
||||
ASSERT(pNodeRetention != NULL);
|
||||
|
@ -148,7 +146,6 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep);
|
||||
tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit);
|
||||
}
|
||||
#endif
|
||||
if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
|
||||
if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
|
||||
|
|
|
@ -47,10 +47,27 @@ int vnodeBegin(SVnode *pVnode) {
|
|||
}
|
||||
|
||||
// begin tsdb
|
||||
if (vnodeIsRollup(pVnode)) {
|
||||
if (tsdbBegin(VND_RSMA0(pVnode)) < 0) {
|
||||
vError("vgId:%d failed to begin rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbBegin(VND_RSMA1(pVnode)) < 0) {
|
||||
vError("vgId:%d failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbBegin(VND_RSMA2(pVnode)) < 0) {
|
||||
vError("vgId:%d failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (tsdbBegin(pVnode->pTsdb) < 0) {
|
||||
vError("vgId:%d failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -461,7 +461,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
|||
SSubmitRsp rsp = {0};
|
||||
|
||||
pRsp->code = 0;
|
||||
tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||
|
||||
// handle the request
|
||||
if (tsdbInsertData(pVnode->pTsdb, version, pSubmitReq, &rsp) < 0) {
|
||||
pRsp->code = terrno;
|
||||
|
@ -470,12 +470,28 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
|||
|
||||
// pRsp->msgType = TDMT_VND_SUBMIT_RSP;
|
||||
// vnodeProcessSubmitReq(pVnode, ptr, pRsp);
|
||||
// tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||
|
||||
// encode the response (TODO)
|
||||
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
|
||||
memcpy(pRsp->pCont, &rsp, sizeof(rsp));
|
||||
pRsp->contLen = sizeof(SSubmitRsp);
|
||||
|
||||
tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
|
||||
if(!pReq) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
||||
|
||||
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue