Merge pull request #26874 from taosdata/fix/syntax

fix(query): check the return value.
This commit is contained in:
Haojun Liao 2024-07-30 17:14:35 +08:00 committed by GitHub
commit ec670dc8f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 38 additions and 29 deletions

View File

@ -1020,10 +1020,10 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
int8_t mndTrigger, bool lock) { int8_t mndTrigger, bool lock) {
int32_t code = -1; int32_t code = TSDB_CODE_SUCCESS;
int64_t ts = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
return TSDB_CODE_SUCCESS; return code;
} }
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
@ -1087,13 +1087,11 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
goto _ERR; goto _ERR;
} }
if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) { code = mndTransPrepare(pMnode, pTrans);
code = terrno; if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to prepare checkpoint trans since %s", terrstr()); mError("failed to prepare checkpoint trans since %s", terrstr());
goto _ERR;
} }
code = 0;
_ERR: _ERR:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
@ -1458,7 +1456,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1; return -1;
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -2179,7 +2178,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -2273,7 +2272,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
} }
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
if (mndTransPrepare(pMnode, pTrans) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -2434,7 +2434,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
} }
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -3129,7 +3129,7 @@ int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream,
} }
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);

View File

@ -94,7 +94,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
} }
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != 0) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -197,7 +197,8 @@ int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) {
return code; return code;
} }
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;

View File

@ -1132,7 +1132,7 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i
} }
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);

View File

@ -101,7 +101,10 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
return terrno; return terrno;
} }
taosArrayPush(pBucket->pPageIdList, &newPageId); void* px = taosArrayPush(pBucket->pPageIdList, &newPageId);
if (px == NULL) {
return terrno;
}
doCopyObject(pNewPage->data, key, keyLen, data, size); doCopyObject(pNewPage->data, key, keyLen, data, size);
pNewPage->num = sizeof(SFilePage) + nodeSize; pNewPage->num = sizeof(SFilePage) + nodeSize;
@ -127,7 +130,7 @@ static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket
char* p = (char*)pNode + len; char* p = (char*)pNode + len;
char* pEnd = (char*)pPage + pPage->num; char* pEnd = (char*)pPage + pPage->num;
memmove(pNode, p, (pEnd - p)); (void) memmove(pNode, p, (pEnd - p));
pPage->num -= len; pPage->num -= len;
if (pPage->num == 0) { if (pPage->num == 0) {
@ -189,7 +192,7 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) {
nodeSize = GET_LHASH_NODE_LEN(pStart); nodeSize = GET_LHASH_NODE_LEN(pStart);
} else { // move to the front of pLast page } else { // move to the front of pLast page
if (pStart != pLast->data) { if (pStart != pLast->data) {
memmove(pLast->data, pStart, (((char*)pLast) + pLast->num - pStart)); (void) memmove(pLast->data, pStart, (((char*)pLast) + pLast->num - pStart));
setBufPageDirty(pLast, true); setBufPageDirty(pLast, true);
} }
@ -235,7 +238,10 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) {
setBufPageDirty(p, true); setBufPageDirty(p, true);
releaseBufPage(pHashObj->pBuf, p); releaseBufPage(pHashObj->pBuf, p);
taosArrayPush(pBucket->pPageIdList, &pageId); void* px = taosArrayPush(pBucket->pPageIdList, &pageId);
if (px == NULL) {
return terrno;
}
pHashObj->numOfBuckets += 1; pHashObj->numOfBuckets += 1;
// printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets); // printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets);
@ -251,7 +257,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
if (!osTempSpaceAvailable()) { if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_DISKSPACE; terrno = TSDB_CODE_NO_DISKSPACE;
printf("tHash Init failed since %s, tempDir:%s", terrstr(), tsTempDir); (void) printf("tHash Init failed since %s, tempDir:%s", terrstr(), tsTempDir);
taosMemoryFree(pHashObj); taosMemoryFree(pHashObj);
return NULL; return NULL;
} }
@ -301,9 +307,10 @@ void* tHashCleanup(SLHashObj* pHashObj) {
} }
int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data, size_t size) { int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data, size_t size) {
int32_t code = 0;
if (pHashObj->bits == 0) { if (pHashObj->bits == 0) {
SLHashBucket* pBucket = pHashObj->pBucket[0]; SLHashBucket* pBucket = pHashObj->pBucket[0];
doAddToBucket(pHashObj, pBucket, 0, key, keyLen, data, size); code = doAddToBucket(pHashObj, pBucket, 0, key, keyLen, data, size);
} else { } else {
int32_t hashVal = pHashObj->hashFn(key, keyLen); int32_t hashVal = pHashObj->hashFn(key, keyLen);
int32_t v = doGetBucketIdFromHashVal(hashVal, pHashObj->bits); int32_t v = doGetBucketIdFromHashVal(hashVal, pHashObj->bits);
@ -315,10 +322,11 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
} }
SLHashBucket* pBucket = pHashObj->pBucket[v]; SLHashBucket* pBucket = pHashObj->pBucket[v];
int32_t code = doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size); code = doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size);
if (code != TSDB_CODE_SUCCESS) { }
return code;
} if (code) {
return code;
} }
pHashObj->size += 1; pHashObj->size += 1;
@ -327,7 +335,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
if ((pHashObj->numOfBuckets * LHASH_CAP_RATIO * pHashObj->tuplesPerPage) < pHashObj->size) { if ((pHashObj->numOfBuckets * LHASH_CAP_RATIO * pHashObj->tuplesPerPage) < pHashObj->size) {
int32_t newBucketId = pHashObj->numOfBuckets; int32_t newBucketId = pHashObj->numOfBuckets;
int32_t code = doAddNewBucket(pHashObj); code = doAddNewBucket(pHashObj);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -362,7 +370,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
ASSERT(v1 == newBucketId); ASSERT(v1 == newBucketId);
// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1); // printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId]; SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId];
doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen, code = doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen,
GET_LHASH_NODE_KEY(pNode), pNode->dataLen); GET_LHASH_NODE_KEY(pNode), pNode->dataLen);
doRemoveFromBucket(p, pNode, pBucket); doRemoveFromBucket(p, pNode, pBucket);
} else { } else {
@ -377,7 +385,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
doTrimBucketPages(pHashObj, pBucket); doTrimBucketPages(pHashObj, pBucket);
} }
return TSDB_CODE_SUCCESS; return code;
} }
char* tHashGet(SLHashObj* pHashObj, const void* key, size_t keyLen) { char* tHashGet(SLHashObj* pHashObj, const void* key, size_t keyLen) {
@ -420,8 +428,8 @@ int32_t tHashRemove(SLHashObj* pHashObj, const void* key, size_t keyLen) {
} }
void tHashPrint(const SLHashObj* pHashObj, int32_t type) { void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
printf("==================== linear hash ====================\n"); (void) printf("==================== linear hash ====================\n");
printf("total bucket:%d, size:%" PRId64 ", ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO); (void) printf("total bucket:%d, size:%" PRId64 ", ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO);
dBufSetPrintInfo(pHashObj->pBuf); dBufSetPrintInfo(pHashObj->pBuf);