Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837
This commit is contained in:
commit
18ade9c106
|
@ -3642,7 +3642,7 @@ int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal);
|
||||||
int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal);
|
int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal);
|
||||||
void tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal);
|
void tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal);
|
||||||
bool tOffsetEqual(const STqOffsetVal* pLeft, const STqOffsetVal* pRight);
|
bool tOffsetEqual(const STqOffsetVal* pLeft, const STqOffsetVal* pRight);
|
||||||
int32_t tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pRight);
|
void tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pRight);
|
||||||
void tOffsetDestroy(void* pVal);
|
void tOffsetDestroy(void* pVal);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -9238,18 +9238,17 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tOffsetCopy(STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
void tOffsetCopy(STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
||||||
tOffsetDestroy(pLeft);
|
tOffsetDestroy(pLeft);
|
||||||
*pLeft = *pRight;
|
*pLeft = *pRight;
|
||||||
if (IS_VAR_DATA_TYPE(pRight->primaryKey.type)) {
|
if (IS_VAR_DATA_TYPE(pRight->primaryKey.type)) {
|
||||||
pLeft->primaryKey.pData = taosMemoryMalloc(pRight->primaryKey.nData);
|
pLeft->primaryKey.pData = taosMemoryMalloc(pRight->primaryKey.nData);
|
||||||
if (pLeft->primaryKey.pData == NULL) {
|
if (pLeft->primaryKey.pData == NULL) {
|
||||||
uError("failed to allocate memory for offset");
|
uError("failed to allocate memory for offset");
|
||||||
return terrno;
|
return;
|
||||||
}
|
}
|
||||||
(void)memcpy(pLeft->primaryKey.pData, pRight->primaryKey.pData, pRight->primaryKey.nData);
|
(void)memcpy(pLeft->primaryKey.pData, pRight->primaryKey.pData, pRight->primaryKey.nData);
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void tOffsetDestroy(void *param) {
|
void tOffsetDestroy(void *param) {
|
||||||
|
|
|
@ -214,8 +214,17 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
|
|
||||||
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
|
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
|
||||||
int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
|
int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
|
||||||
void *pHead = rpcMallocCont(contLen);
|
if (contLen < 0) {
|
||||||
tSerializeSNotifyReq(pHead, contLen, pReq);
|
dError("failed to serialize notify req since %s", tstrerror(contLen));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
void *pHead = rpcMallocCont(contLen);
|
||||||
|
contLen = tSerializeSNotifyReq(pHead, contLen, pReq);
|
||||||
|
if (contLen < 0) {
|
||||||
|
rpcFreeCont(pHead);
|
||||||
|
dError("failed to serialize notify req since %s", tstrerror(contLen));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.pCont = pHead,
|
SRpcMsg rpcMsg = {.pCont = pHead,
|
||||||
.contLen = contLen,
|
.contLen = contLen,
|
||||||
|
|
|
@ -1437,7 +1437,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
(void) tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
|
tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1126,7 +1126,7 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
||||||
taosArrayDestroy(pBackend->chkpSaved);
|
taosArrayDestroy(pBackend->chkpSaved);
|
||||||
pBackend->chkpSaved = chkpDup;
|
pBackend->chkpSaved = chkpDup;
|
||||||
|
|
||||||
taosThreadRwlockUnlock(&pBackend->chkpDirLock);
|
(void)taosThreadRwlockUnlock(&pBackend->chkpDirLock);
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
|
for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
|
||||||
int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
|
int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
|
||||||
|
@ -1438,7 +1438,7 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
taosArrayPush(pSnap, &snap);
|
(void)taosArrayPush(pSnap, &snap);
|
||||||
|
|
||||||
pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
|
pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
|
||||||
}
|
}
|
||||||
|
@ -1505,7 +1505,7 @@ void* taskAcquireDb(int64_t refId) {
|
||||||
}
|
}
|
||||||
void taskReleaseDb(int64_t refId) {
|
void taskReleaseDb(int64_t refId) {
|
||||||
// release
|
// release
|
||||||
taosReleaseRef(taskDbWrapperId, refId);
|
(void)taosReleaseRef(taskDbWrapperId, refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t taskGetDBRef(void* arg) {
|
int64_t taskGetDBRef(void* arg) {
|
||||||
|
@ -1566,7 +1566,7 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId)
|
||||||
code = 0;
|
code = 0;
|
||||||
_EXIT:
|
_EXIT:
|
||||||
taosMemoryFree(pDst);
|
taosMemoryFree(pDst);
|
||||||
taosCloseFile(&pFile);
|
(void)taosCloseFile(&pFile);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
|
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
|
||||||
|
@ -4101,7 +4101,7 @@ void streamStateSessionClear_rocksdb(SStreamState* pState) {
|
||||||
if (code == 0 && size > 0) {
|
if (code == 0 && size > 0) {
|
||||||
memset(buf, 0, size);
|
memset(buf, 0, size);
|
||||||
// refactor later
|
// refactor later
|
||||||
streamStateSessionPut_rocksdb(pState, &delKey, buf, size);
|
(void)streamStateSessionPut_rocksdb(pState, &delKey, buf, size);
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFreeClear(buf);
|
taosMemoryFreeClear(buf);
|
||||||
break;
|
break;
|
||||||
|
@ -4311,7 +4311,7 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock
|
||||||
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
|
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
|
||||||
void* val, int32_t vlen, int64_t ttl) {
|
void* val, int32_t vlen, int64_t ttl) {
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
(void)atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
||||||
|
|
||||||
int i = streamStateGetCfIdx(pState, cfKeyName);
|
int i = streamStateGetCfIdx(pState, cfKeyName);
|
||||||
if (i < 0) {
|
if (i < 0) {
|
||||||
|
@ -4357,7 +4357,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
|
|
||||||
{
|
{
|
||||||
char tbuf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
ginitDict[cfIdx].toStrFunc((void*)key, tbuf);
|
(void)(ginitDict[cfIdx].toStrFunc((void*)key, tbuf));
|
||||||
stTrace("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key);
|
stTrace("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -4365,7 +4365,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
(void)atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
||||||
rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
|
rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
stError("streamState failed to write batch, err:%s", err);
|
stError("streamState failed to write batch, err:%s", err);
|
||||||
|
@ -4745,7 +4745,7 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
|
||||||
static char* chkpMeta = "META";
|
static char* chkpMeta = "META";
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosThreadRwlockRdlock(&p->rwLock);
|
(void)taosThreadRwlockRdlock(&p->rwLock);
|
||||||
|
|
||||||
int32_t cap = p->len + 128;
|
int32_t cap = p->len + 128;
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ int32_t doRegComp(pcre2_code** ppRegex, pcre2_match_data** ppMatchData, const ch
|
||||||
*ppRegex = pcre2_compile((PCRE2_SPTR8)pattern, PCRE2_ZERO_TERMINATED, options, &errorcode, &erroroffset, NULL);
|
*ppRegex = pcre2_compile((PCRE2_SPTR8)pattern, PCRE2_ZERO_TERMINATED, options, &errorcode, &erroroffset, NULL);
|
||||||
if (*ppRegex == NULL) {
|
if (*ppRegex == NULL) {
|
||||||
PCRE2_UCHAR buffer[256];
|
PCRE2_UCHAR buffer[256];
|
||||||
pcre2_get_error_message(errorcode, buffer, sizeof(buffer));
|
(void)pcre2_get_error_message(errorcode, buffer, sizeof(buffer));
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ int32_t doRegExec(const char* pString, pcre2_code* pRegex, pcre2_match_data* pMa
|
||||||
ret = pcre2_match(pRegex, (PCRE2_SPTR)pString, PCRE2_ZERO_TERMINATED, 0, 0, pMatchData, NULL);
|
ret = pcre2_match(pRegex, (PCRE2_SPTR)pString, PCRE2_ZERO_TERMINATED, 0, 0, pMatchData, NULL);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
PCRE2_UCHAR buffer[256];
|
PCRE2_UCHAR buffer[256];
|
||||||
pcre2_get_error_message(ret, buffer, sizeof(buffer));
|
(void)pcre2_get_error_message(ret, buffer, sizeof(buffer));
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue