refact: adapt to new sml api

This commit is contained in:
Liu Jicong 2022-12-26 10:14:59 +08:00
parent 78add31d45
commit 834495d62a
3 changed files with 21 additions and 14 deletions

View File

@ -360,7 +360,7 @@ size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.ro
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) { int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
if (pDataBlock->info.rows > 0) { if (pDataBlock->info.rows > 0) {
// ASSERT(pDataBlock->info.dataLoad == 1); // ASSERT(pDataBlock->info.dataLoad == 1);
} }
if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) { if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) {
@ -1169,7 +1169,8 @@ void blockDataEmpty(SSDataBlock* pDataBlock) {
// todo temporarily disable it // todo temporarily disable it
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) { static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
bool clearPayload) {
ASSERT(numOfRows > 0); ASSERT(numOfRows > 0);
if (numOfRows <= pBlockInfo->capacity) { if (numOfRows <= pBlockInfo->capacity) {
@ -1228,7 +1229,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
pColumn->hasNull = false; pColumn->hasNull = false;
if (IS_VAR_DATA_TYPE(pColumn->info.type)) { if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
@ -1956,7 +1957,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
"|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "\n", "|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "\n",
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey); pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey);
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
@ -2388,7 +2390,7 @@ _end:
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
ASSERT(stbFullName[0] != 0); ASSERT(stbFullName[0] != 0);
SArray* tags = taosArrayInit(0, sizeof(void*)); SArray* tags = taosArrayInit(0, sizeof(SSmlKv));
if (tags == NULL) { if (tags == NULL) {
return NULL; return NULL;
} }
@ -2399,8 +2401,10 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
return NULL; return NULL;
} }
SSmlKv pTag = {.key = "group_id", .keyLen = sizeof("group_id") - 1, SSmlKv pTag = {.key = "group_id",
.type = TSDB_DATA_TYPE_UBIGINT, .u = groupId, .keyLen = sizeof("group_id") - 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.u = groupId,
.length = sizeof(uint64_t)}; .length = sizeof(uint64_t)};
taosArrayPush(tags, &pTag); taosArrayPush(tags, &pTag);

View File

@ -948,6 +948,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
taosArrayPush(tbData.aRowP, &pRow); taosArrayPush(tbData.aRowP, &pRow);
} }
taosArrayClear(pReq->aSubmitTbData);
taosArrayPush(pReq->aSubmitTbData, &tbData); taosArrayPush(pReq->aSubmitTbData, &tbData);
// encode // encode

View File

@ -950,6 +950,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
} }
} }
vDebug("vgId:%d, submit block size %d", TD_VID(pVnode), (int32_t)taosArrayGetSize(pSubmitReq->aSubmitTbData));
// loop to handle // loop to handle
for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) { for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
@ -1040,13 +1042,13 @@ _exit:
#else #else
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
SSubmitRsp submitRsp = {0}; SSubmitRsp submitRsp = {0};
int32_t nRows = 0; int32_t nRows = 0;
int32_t tsize, ret; int32_t tsize, ret;
SEncoder encoder = {0}; SEncoder encoder = {0};
SArray *newTbUids = NULL; SArray *newTbUids = NULL;
SVStatis statis = {0}; SVStatis statis = {0};
bool tbCreated = false; bool tbCreated = false;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
pRsp->code = 0; pRsp->code = 0;