Merge pull request #27032 from taosdata/fix/syntax

fix(stream):set accept code for trans.
This commit is contained in:
Haojun Liao 2024-08-08 09:08:08 +08:00 committed by GitHub
commit 3ae295dd1f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 64 additions and 99 deletions

View File

@ -533,7 +533,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
return code;
}
code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, 0);
code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code) {
taosMemoryFree(buf);
}

View File

@ -320,7 +320,7 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
return terrno;
}
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, 0);
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return terrno;
@ -424,7 +424,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
(void) epsetToStr(&epset, buf, tListLen(buf));
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, 0);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return code;
@ -714,7 +714,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
return code;
}
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, 0);
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
}

View File

@ -28,7 +28,7 @@ int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t num
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
if (pLoadInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pLoadInfo->blockData[0].sttBlockIndex = -1;
@ -50,9 +50,8 @@ int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t num
pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
if (pLoadInfo->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(pLoadInfo);
return code;
return terrno;
}
pLoadInfo->pSchema = pSchema;
@ -358,7 +357,7 @@ static int32_t tValueDupPayload(SValue *pVal) {
char *p = (char *)pVal->pData;
char *pBuf = taosMemoryMalloc(pVal->nData);
if (pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
memcpy(pBuf, p, pVal->nData);
@ -371,13 +370,15 @@ static int32_t tValueDupPayload(SValue *pVal) {
static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo,
TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
void* px = NULL;
int32_t startIndex = 0;
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
if (numOfBlocks <= 0) {
return code;
}
int32_t startIndex = 0;
while ((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) {
++startIndex;
}
@ -413,150 +414,113 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
// existed
if (i < rows) {
if (pBlockLoadInfo->info.pUid == NULL) {
pBlockLoadInfo->info.pUid = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pFirstTs = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pLastTs = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pCount = taosArrayInit(rows, sizeof(int64_t));
SSttTableRowsInfo* pInfo = &pBlockLoadInfo->info;
pBlockLoadInfo->info.pFirstKey = taosArrayInit(rows, sizeof(SValue));
pBlockLoadInfo->info.pLastKey = taosArrayInit(rows, sizeof(SValue));
if (pInfo->pUid == NULL) {
pInfo->pUid = taosArrayInit(rows, sizeof(int64_t));
pInfo->pFirstTs = taosArrayInit(rows, sizeof(int64_t));
pInfo->pLastTs = taosArrayInit(rows, sizeof(int64_t));
pInfo->pCount = taosArrayInit(rows, sizeof(int64_t));
pInfo->pFirstKey = taosArrayInit(rows, sizeof(SValue));
pInfo->pLastKey = taosArrayInit(rows, sizeof(SValue));
if (pInfo->pUid == NULL || pInfo->pFirstTs == NULL || pInfo->pLastTs == NULL || pInfo->pCount == NULL ||
pInfo->pFirstKey == NULL || pInfo->pLastKey == NULL) {
code = terrno;
goto _end;
}
}
if (pStatisBlkArray->data[k].maxTbid.suid == suid) {
int32_t size = rows - i;
int32_t offset = i * sizeof(int64_t);
px = taosArrayAddBatch(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.uids, offset), size);
if (px == NULL) {
return terrno;
}
px = taosArrayAddBatch(pInfo->pUid, tBufferGetDataAt(&block.uids, offset), size);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayAddBatch(pBlockLoadInfo->info.pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, offset), size);
if (px == NULL){
return terrno;
}
px = taosArrayAddBatch(pInfo->pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, offset), size);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayAddBatch(pBlockLoadInfo->info.pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, offset), size);
if (px == NULL){
return terrno;
}
px = taosArrayAddBatch(pInfo->pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, offset), size);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, offset), size);
if (px == NULL){
return terrno;
}
px = taosArrayAddBatch(pInfo->pCount, tBufferGetDataAt(&block.counts, offset), size);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
if (block.numOfPKs > 0) {
SValue vFirst = {0}, vLast = {0};
for (int32_t f = i; f < rows; ++f) {
code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
if (code) {
break;
}
TSDB_CHECK_CODE(code, lino, _end);
code = tValueDupPayload(&vFirst);
if (code) {
break;
}
TSDB_CHECK_CODE(code, lino, _end);
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pFirstKey, &vFirst);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
// todo add api to clone the original data
code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
if (code) {
break;
}
TSDB_CHECK_CODE(code, lino, _end);
code = tValueDupPayload(&vLast);
if (code) {
break;
}
TSDB_CHECK_CODE(code, lino, _end);
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pLastKey, &vLast);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
}
} else {
SValue vFirst = {0};
for (int32_t j = 0; j < size; ++j) {
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pFirstKey, &vFirst);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &vFirst);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pLastKey, &vFirst);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
}
}
} else {
STbStatisRecord record = {0};
while (i < rows) {
(void)tStatisBlockGet(&block, i, &record);
if (record.suid != suid) {
break;
}
px = taosArrayPush(pBlockLoadInfo->info.pUid, &record.uid);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pUid, &record.uid);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayPush(pBlockLoadInfo->info.pCount, &record.count);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pCount, &record.count);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pFirstTs, &record.firstKey.ts);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pLastTs, &record.lastKey.ts);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
if (record.firstKey.numOfPKs > 0) {
SValue s = record.firstKey.pks[0];
code = tValueDupPayload(&s);
if (code) {
return code;
}
TSDB_CHECK_CODE(code, lino, _end);
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pFirstKey, &s);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
s = record.lastKey.pks[0];
code = tValueDupPayload(&s);
if (code) {
return code;
}
TSDB_CHECK_CODE(code, lino, _end);
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &s);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pLastKey, &s);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
} else {
SValue v = {0};
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &v);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pFirstKey, &v);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &v);
if (px == NULL) {
return terrno;
}
px = taosArrayPush(pInfo->pLastKey, &v);
TSDB_CHECK_NULL(px, code, lino, _end, terrno);
}
i += 1;
@ -565,6 +529,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
}
}
_end:
(void)tStatisBlockDestroy(&block);
double el = (taosGetTimestampUs() - st) / 1000.0;