refactor: row and submit msg of rsma/tsdb
This commit is contained in:
parent
4828a19e10
commit
d3a88ea517
|
@ -266,7 +266,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
|
|||
// for debug
|
||||
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
|
||||
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||
tb_uid_t suid);
|
||||
|
||||
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "tlog.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define MALLOC_ALIGN_BYTES 32
|
||||
#define MALLOC_ALIGN_BYTES 32
|
||||
|
||||
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
|
||||
ASSERT(pColumnInfoData != NULL);
|
||||
|
@ -38,7 +38,8 @@ int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t num
|
|||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
|
||||
} else {
|
||||
return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) + BitmapLen(numOfRows);
|
||||
return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) +
|
||||
BitmapLen(numOfRows);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -279,7 +280,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int
|
|||
pColumnInfoData->varmeta.allocLen = len + oldLen;
|
||||
}
|
||||
|
||||
if (pColumnInfoData->pData && pSource->pData) { // TD-20382
|
||||
if (pColumnInfoData->pData && pSource->pData) { // TD-20382
|
||||
memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
|
||||
}
|
||||
pColumnInfoData->varmeta.length = len + oldLen;
|
||||
|
@ -1157,7 +1158,8 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
|
|||
}
|
||||
|
||||
// todo temporarily disable it
|
||||
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) {
|
||||
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
|
||||
bool clearPayload) {
|
||||
ASSERT(numOfRows > 0 /*&& pBlockInfo->capacity >= pBlockInfo->rows*/);
|
||||
if (numOfRows <= pBlockInfo->capacity) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2009,9 +2011,10 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
* @param suid
|
||||
*
|
||||
*/
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId,
|
||||
#if 0
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId,
|
||||
tb_uid_t suid) {
|
||||
int32_t bufSize = sizeof(SSubmitReq);
|
||||
int32_t bufSize = sizeof(SSubmitReq2);
|
||||
int32_t sz = 1;
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
const SDataBlockInfo* pBlkInfo = &pDataBlock->info;
|
||||
|
@ -2174,6 +2177,154 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, STSchema* pTSchema,
|
||||
int32_t vgId, tb_uid_t suid) {
|
||||
SSubmitReq2* pReq = *ppReq;
|
||||
SArray* pVals = NULL;
|
||||
int32_t bufSize = sizeof(SSubmitReq2);
|
||||
int32_t numOfBlks = 0;
|
||||
int32_t sz = 1;
|
||||
|
||||
if (!(pReq = taosMemoryMalloc(bufSize))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
|
||||
if (colNum <= 1) { // invalid if only with TS col
|
||||
continue;
|
||||
}
|
||||
|
||||
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData));
|
||||
|
||||
if (!pTbData) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*));
|
||||
pTbData->suid = suid;
|
||||
pTbData->uid = pDataBlock->info.id.groupId;
|
||||
pTbData->sver = pTSchema->version;
|
||||
|
||||
if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < rows; ++j) { // iterate by row
|
||||
|
||||
taosArrayClear(pVals);
|
||||
|
||||
bool isStartKey = false;
|
||||
int32_t offset = 0;
|
||||
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
STColumn* pCol = &pTSchema->columns[k];
|
||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||
|
||||
switch (pColInfoData->info.type) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
ASSERT(pColInfoData->info.type == pCol->type);
|
||||
if (!isStartKey) {
|
||||
isStartKey = true;
|
||||
SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, pCol->type, (SValue){.val = *(TSKEY*)var});
|
||||
taosArrayPush(pVals, &cv);
|
||||
} else if (colDataIsNull_s(pColInfoData, j)) {
|
||||
SColVal cv = COL_VAL_NULL(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type);
|
||||
taosArrayPush(pVals, &cv);
|
||||
} else {
|
||||
SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, (SValue){.val = *(int64_t*)var});
|
||||
taosArrayPush(pVals, &cv);
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
|
||||
ASSERT(pColInfoData->info.type == pCol->type);
|
||||
if (colDataIsNull_s(pColInfoData, j)) {
|
||||
SColVal cv = COL_VAL_NULL(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type);
|
||||
taosArrayPush(pVals, &cv);
|
||||
} else {
|
||||
void* data = colDataGetVarData(pColInfoData, j);
|
||||
SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value
|
||||
SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, pCol->type, sv);
|
||||
taosArrayPush(pVals, &cv);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
|
||||
ASSERT(0);
|
||||
break;
|
||||
default:
|
||||
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
|
||||
if (colDataIsNull_s(pColInfoData, j)) {
|
||||
SColVal cv = COL_VAL_NULL(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type); // should use pCol->type
|
||||
taosArrayPush(pVals, &cv);
|
||||
} else {
|
||||
SValue sv;
|
||||
if (pCol->type == pColInfoData->info.type) {
|
||||
memcpy(&sv.val, var, tDataTypes[pCol->type].bytes);
|
||||
} else {
|
||||
/**
|
||||
* 1. sum/avg would convert to int64_t/uint64_t/double during aggregation
|
||||
* 2. below conversion may lead to overflow or loss, the app should select the right data type.
|
||||
*/
|
||||
char tv[8] = {0};
|
||||
if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
|
||||
float v = 0;
|
||||
GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
double v = 0;
|
||||
GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
} else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||
int64_t v = 0;
|
||||
GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
} else {
|
||||
uint64_t v = 0;
|
||||
GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
}
|
||||
memcpy(&sv.val, tv, tDataTypes[pCol->type].bytes);
|
||||
}
|
||||
SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, sv);
|
||||
taosArrayPush(pVals, &cv);
|
||||
}
|
||||
} else {
|
||||
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
|
||||
ASSERT(0);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
SRow* pRow = NULL;
|
||||
tRowBuild(pVals, pTSchema, &pRow);
|
||||
if (pRow) {
|
||||
taosArrayPush(pTbData->aRowP, &pRow);
|
||||
}
|
||||
}
|
||||
}
|
||||
_end:
|
||||
if (terrno != 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
|
||||
ASSERT(stbFullName[0] != 0);
|
||||
|
|
|
@ -158,8 +158,8 @@ int32_t tsdbCommit(STsdb* pTsdb);
|
|||
int32_t tsdbFinishCommit(STsdb* pTsdb);
|
||||
int32_t tsdbRollbackCommit(STsdb* pTsdb);
|
||||
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
|
||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
|
||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
||||
|
@ -220,7 +220,7 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
|||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||
|
||||
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
|
||||
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
|
||||
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t len, int32_t inputType);
|
||||
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
|
||||
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
|
||||
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
|
||||
|
|
|
@ -601,8 +601,8 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
||||
// TODO: spin lock for race conditiond
|
||||
SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq;
|
||||
// spin lock for race condition during insert data
|
||||
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -610,29 +610,19 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SSubmitBlk *pBlock = NULL;
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
STSRow *row = NULL;
|
||||
static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
|
||||
SArray *pSubmitTbData = pMsg ? pMsg->aSubmitTbData : NULL;
|
||||
int32_t size = taosArrayGetSize(pSubmitTbData);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) {
|
||||
return -1;
|
||||
}
|
||||
while (true) {
|
||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) {
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SSubmitTbData *pData = TARRAY_GET_ELEM(pSubmitTbData, i);
|
||||
if (terrno = tdUidStorePut(pStore, pData->suid, NULL) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!pBlock) break;
|
||||
tdUidStorePut(pStore, msgIter.suid, NULL);
|
||||
}
|
||||
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -712,7 +702,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
|||
output->info.rows);
|
||||
|
||||
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
|
||||
SSubmitReq *pReq = NULL;
|
||||
SSubmitReq2 *pReq = NULL;
|
||||
|
||||
// TODO: the schema update should be handled later(TD-17965)
|
||||
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
|
||||
|
@ -730,10 +720,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
|||
goto _err;
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64
|
||||
" len %" PRIu32,
|
||||
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version,
|
||||
htonl(pReq->header.contLen));
|
||||
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
|
||||
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version);
|
||||
|
||||
taosMemoryFreeClear(pReq);
|
||||
}
|
||||
|
@ -754,21 +742,22 @@ _err:
|
|||
*
|
||||
* @param pSma
|
||||
* @param pMsg
|
||||
* @param len
|
||||
* @param inputType
|
||||
* @param pInfo
|
||||
* @param suid
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo,
|
||||
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo,
|
||||
tb_uid_t suid) {
|
||||
const SSubmitReq *pReq = (const SSubmitReq *)pMsg;
|
||||
const SSubmitReq2 *pReq = (const SSubmitReq2 *)pMsg;
|
||||
|
||||
void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM);
|
||||
void *qItem = taosAllocateQitem(len, DEF_QITEM);
|
||||
if (!qItem) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
memcpy(qItem, pMsg, pReq->header.contLen);
|
||||
memcpy(qItem, pMsg, len);
|
||||
|
||||
taosWriteQitem(pInfo->queue, qItem);
|
||||
|
||||
|
@ -1015,7 +1004,7 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
|
|||
* @param suid
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
|
||||
static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, tb_uid_t suid) {
|
||||
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
|
||||
if (!pRSmaInfo) {
|
||||
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
|
||||
|
@ -1023,7 +1012,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp
|
|||
}
|
||||
|
||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
if (tdExecuteRSmaImplAsync(pSma, pMsg, inputType, pRSmaInfo, suid) < 0) {
|
||||
if (tdExecuteRSmaImplAsync(pSma, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
|
||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -1045,7 +1034,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
||||
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t len, int32_t inputType) {
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
if (!pEnv) {
|
||||
// only applicable when rsma env exists
|
||||
|
@ -1060,18 +1049,21 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
|||
|
||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
if (tdFetchSubmitReqSuids(pMsg, &uidStore) < 0) {
|
||||
smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (uidStore.suid != 0) {
|
||||
if (tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid) < 0) {
|
||||
if (tdExecuteRSmaAsync(pSma, pMsg, len, inputType, uidStore.suid) < 0) {
|
||||
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
void *pIter = NULL;
|
||||
while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
|
||||
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||
if (tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid) < 0) {
|
||||
if (tdExecuteRSmaAsync(pSma, pMsg, len, inputType, *pTbSuid) < 0) {
|
||||
smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr());
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
@ -1081,7 +1073,6 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
tdUidStoreDestory(&uidStore);
|
||||
smaError("vgId:%d, failed to process rsma submit since: %s", SMA_VID(pSma), terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,14 +26,17 @@ static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 921
|
|||
|
||||
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
||||
|
||||
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SSubmitBlk *pBlock = NULL;
|
||||
int32_t affectedrows = 0;
|
||||
int32_t numOfRows = 0;
|
||||
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 *pRsp) {
|
||||
int32_t arrSize = 0;
|
||||
int32_t affectedrows = 0;
|
||||
int32_t numOfRows = 0;
|
||||
|
||||
ASSERT(pTsdb->mem != NULL);
|
||||
|
||||
if (pMsg) {
|
||||
arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
|
||||
}
|
||||
|
||||
// scan and convert
|
||||
if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
|
||||
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
||||
|
@ -43,22 +46,10 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
|
|||
}
|
||||
|
||||
// loop to insert
|
||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) {
|
||||
return -1;
|
||||
}
|
||||
while (true) {
|
||||
SSubmitBlkRsp r = {0};
|
||||
tGetSubmitMsgNext(&msgIter, &pBlock);
|
||||
if (pBlock == NULL) break;
|
||||
#if 0
|
||||
if ((terrno = tsdbInsertTableData(pTsdb, version, &msgIter, pBlock, &r)) < 0) {
|
||||
for (int32_t i = 0; i < arrSize; ++i) {
|
||||
if ((terrno = tsdbInsertTableData(pTsdb, version, taosArrayGet(pMsg->aSubmitTbData, i), &affectedrows)) < 0) {
|
||||
return -1;
|
||||
}
|
||||
#else
|
||||
ASSERT(0);
|
||||
#endif
|
||||
|
||||
numOfRows += msgIter.numOfRows;
|
||||
}
|
||||
|
||||
if (pRsp != NULL) {
|
||||
|
@ -86,9 +77,8 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *
|
|||
}
|
||||
#endif
|
||||
|
||||
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *row, TSKEY minKey, TSKEY maxKey,
|
||||
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowKey, TSKEY minKey, TSKEY maxKey,
|
||||
TSKEY now) {
|
||||
TSKEY rowKey = TD_ROW_KEY(row);
|
||||
if (rowKey < minKey || rowKey > maxKey) {
|
||||
tsdbError("vgId:%d, table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
|
||||
" maxKey %" PRId64 " row key %" PRId64,
|
||||
|
@ -100,6 +90,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
||||
ASSERT(pMsg != NULL);
|
||||
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
|
||||
|
@ -159,6 +150,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
|||
}
|
||||
}
|
||||
#endif
|
||||
// pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT
|
||||
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
||||
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) {
|
||||
|
@ -167,6 +159,46 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
|
||||
ASSERT(pMsg != NULL);
|
||||
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
|
||||
TSKEY now = taosGetTimestamp(pCfg->precision);
|
||||
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
|
||||
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
|
||||
int32_t size = taosArrayGetSize(pMsg->aSubmitTbData);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
|
||||
if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||
uint64_t nColData = TARRAY_SIZE(pData->aCol);
|
||||
SColData *aColData = (SColData *)TARRAY_DATA(pData->aCol);
|
||||
if (nColData > 0) {
|
||||
int32_t nRows = aColData[0].nVal;
|
||||
TSKEY *aKey = (TSKEY *)aColData[0].pData;
|
||||
for (int32_t r = 0; r < nRows; ++r) {
|
||||
if (tsdbCheckRowRange(pTsdb, pData->uid, aKey[r], minKey, maxKey, now) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int32_t nRows = taosArrayGetSize(pData->aRowP);
|
||||
for (int32_t r = 0; r < nRows; ++r) {
|
||||
SRow *pRow = (SRow *)taosArrayGetP(pData->aRowP, r);
|
||||
if (tsdbCheckRowRange(pTsdb, pData->uid, pRow->ts, minKey, maxKey, now) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||
return 0;
|
||||
}
|
|
@ -236,7 +236,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
break;
|
||||
/* TSDB */
|
||||
case TDMT_VND_SUBMIT:
|
||||
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
|
||||
if (vnodeProcessSubmitReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_DELETE:
|
||||
if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
|
@ -868,7 +868,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
|
||||
// decode
|
||||
SDecoder dc = {0};
|
||||
tDecoderInit(&dc, (char *)pReq + sizeof(SMsgHead), len - sizeof(SMsgHead));
|
||||
tDecoderInit(&dc, pReq, len);
|
||||
if (tDecodeSSubmitReq2(&dc, pSubmitReq) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _exit;
|
||||
|
@ -876,6 +876,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
tDecoderClear(&dc);
|
||||
|
||||
// check
|
||||
code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
|
||||
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
|
||||
|
||||
|
@ -982,6 +987,7 @@ _exit:
|
|||
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
|
||||
if (code == 0) {
|
||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
|
||||
tdProcessRSmaSubmit(pVnode->pSma, pReq, len, STREAM_INPUT__DATA_SUBMIT);
|
||||
}
|
||||
|
||||
// clear
|
||||
|
|
Loading…
Reference in New Issue