use safe func
This commit is contained in:
parent
4d51e218dc
commit
42519e9552
|
@ -1156,13 +1156,17 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
|||
|
||||
taosArrayDestroy(pBackend->chkpSaved);
|
||||
pBackend->chkpSaved = chkpDup;
|
||||
chkpDup = NULL;
|
||||
|
||||
TAOS_UNUSED(taosThreadRwlockUnlock(&pBackend->chkpDirLock));
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
|
||||
int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
|
||||
char tbuf[256] = {0};
|
||||
sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id);
|
||||
if (snprintf(tbuf, sizeof(tbuf), "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id) >= sizeof(tbuf)) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
TAOS_CHECK_GOTO(code, NULL, _exception);
|
||||
}
|
||||
|
||||
stInfo("backend remove obsolete checkpoint: %s", tbuf);
|
||||
if (taosIsDir(tbuf)) {
|
||||
|
@ -1187,12 +1191,17 @@ int chkpIdComp(const void* a, const void* b) {
|
|||
}
|
||||
int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
|
||||
int32_t code = 0;
|
||||
char* pChkpDir = taosMemoryCalloc(1, 256);
|
||||
int32_t nBytes = 0;
|
||||
int32_t cap = 256;
|
||||
char* pChkpDir = taosMemoryCalloc(1, cap);
|
||||
if (pChkpDir == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
sprintf(pChkpDir, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
|
||||
nBytes = snprintf(pChkpDir, cap, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints");
|
||||
if (nBytes >= cap) {
|
||||
return TSDB_CODE_OUT_OF_RANGE;
|
||||
}
|
||||
if (!taosIsDir(pChkpDir)) {
|
||||
taosMemoryFree(pChkpDir);
|
||||
return 0;
|
||||
|
@ -1413,12 +1422,18 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
|
|||
if (pSnapInfo == NULL) return 0;
|
||||
SStreamMeta* pMeta = arg;
|
||||
int32_t code = 0;
|
||||
int32_t cap = 256;
|
||||
int32_t nBytes = 0;
|
||||
streamMutexLock(&pMeta->backendMutex);
|
||||
|
||||
char buf[128] = {0};
|
||||
char buf[256] = {0};
|
||||
for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
|
||||
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
|
||||
sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
|
||||
nBytes = snprintf(buf, cap, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
|
||||
if (nBytes <= 0 || nBytes >= cap) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
break;
|
||||
}
|
||||
STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
|
||||
if (pTaskDb == NULL || *pTaskDb == NULL) {
|
||||
stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId);
|
||||
|
@ -1430,7 +1445,7 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
|
|||
taskDbUnRefChkp(*pTaskDb, pSnap->chkpId);
|
||||
}
|
||||
streamMutexUnlock(&pMeta->backendMutex);
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
#ifdef BUILD_NO_CALL
|
||||
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
|
||||
|
@ -2447,43 +2462,50 @@ void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
|
|||
|
||||
int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
|
||||
int32_t code = 0;
|
||||
char* statePath = taosMemoryCalloc(1, strlen(path) + 128);
|
||||
int32_t cap = strlen(path) + 128, nBytes = 0;
|
||||
char* statePath = NULL;
|
||||
char* dbPath = NULL;
|
||||
|
||||
statePath = taosMemoryCalloc(1, cap);
|
||||
if (statePath == NULL) {
|
||||
return terrno;
|
||||
TAOS_CHECK_GOTO(terrno, NULL, _err);
|
||||
}
|
||||
|
||||
nBytes = snprintf(statePath, cap, "%s%s%s", path, TD_DIRSEP, key);
|
||||
if (nBytes < 0 || nBytes >= cap) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
sprintf(statePath, "%s%s%s", path, TD_DIRSEP, key);
|
||||
if (!taosDirExist(statePath)) {
|
||||
code = taosMulMkDir(statePath);
|
||||
if (code != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
stError("failed to create dir: %s, reason:%s", statePath, tstrerror(code));
|
||||
taosMemoryFree(statePath);
|
||||
return code;
|
||||
}
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
char* dbPath = taosMemoryCalloc(1, strlen(statePath) + 128);
|
||||
dbPath = taosMemoryCalloc(1, cap);
|
||||
if (dbPath == NULL) {
|
||||
taosMemoryFree(statePath);
|
||||
return terrno;
|
||||
TAOS_CHECK_GOTO(terrno, NULL, _err);
|
||||
}
|
||||
nBytes = snprintf(dbPath, cap, "%s%s%s", statePath, TD_DIRSEP, "state");
|
||||
if (nBytes < 0 || nBytes >= cap) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
sprintf(dbPath, "%s%s%s", statePath, TD_DIRSEP, "state");
|
||||
if (!taosDirExist(dbPath)) {
|
||||
code = taosMulMkDir(dbPath);
|
||||
if (code != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code));
|
||||
taosMemoryFree(statePath);
|
||||
taosMemoryFree(dbPath);
|
||||
return code;
|
||||
}
|
||||
TAOS_CHECK_GOTO(code, NULL, _err);
|
||||
}
|
||||
|
||||
*dbFullPath = dbPath;
|
||||
*stateFullPath = statePath;
|
||||
return 0;
|
||||
_err:
|
||||
stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code));
|
||||
|
||||
taosMemoryFree(statePath);
|
||||
taosMemoryFree(dbPath);
|
||||
return code;
|
||||
}
|
||||
|
||||
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
|
||||
|
@ -2864,6 +2886,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
int64_t streamId;
|
||||
int32_t taskId, dummy = 0;
|
||||
char suffix[64] = {0};
|
||||
int32_t code = 0;
|
||||
|
||||
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
|
||||
RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
|
||||
|
@ -2873,6 +2896,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
for (int i = 0; i < nCf; i++) {
|
||||
char* cf = cfs[i];
|
||||
char funcname[64] = {0};
|
||||
|
||||
cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt);
|
||||
if (i == 0) continue;
|
||||
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
|
||||
|
@ -2909,7 +2933,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
taosMemoryFree(params);
|
||||
taosMemoryFree(cfOpts);
|
||||
// fix other leak
|
||||
return -1;
|
||||
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||
} else {
|
||||
stDebug("succ to open rocksdb cf");
|
||||
}
|
||||
|
@ -2929,8 +2953,13 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
|
||||
char funcname[64] = {0};
|
||||
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
|
||||
char idstr[128] = {0};
|
||||
sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId);
|
||||
char idstr[128] = {0};
|
||||
int32_t nBytes = snprintf(idstr, sizeof(idstr), "0x%" PRIx64 "-%d", streamId, taskId);
|
||||
if (nBytes <= 0 || nBytes >= sizeof(idstr)) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
stError("failed to open cf since %s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int idx = streamStateGetCfIdx(NULL, funcname);
|
||||
|
||||
|
@ -4740,17 +4769,32 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
|
|||
void hashTableToDebug(SHashObj* pTbl, char** buf) {
|
||||
size_t sz = taosHashGetSize(pTbl);
|
||||
int32_t total = 0;
|
||||
char* p = taosMemoryCalloc(1, sz * 16 + 4);
|
||||
void* pIter = taosHashIterate(pTbl, NULL);
|
||||
int32_t cap = sz * 16 + 4;
|
||||
|
||||
char* p = taosMemoryCalloc(1, cap);
|
||||
if (p == NULL) {
|
||||
stError("failed to alloc memory for stream snapshot debug info");
|
||||
return;
|
||||
}
|
||||
|
||||
void* pIter = taosHashIterate(pTbl, NULL);
|
||||
while (pIter) {
|
||||
size_t len = 0;
|
||||
char* name = taosHashGetKey(pIter, &len);
|
||||
char* tname = taosMemoryCalloc(1, len + 1);
|
||||
memcpy(tname, name, len);
|
||||
total += sprintf(p + total, "%s,", tname);
|
||||
if (name == NULL || len <= 0) {
|
||||
pIter = taosHashIterate(pTbl, pIter);
|
||||
continue;
|
||||
}
|
||||
int32_t left = cap - strlen(p);
|
||||
int32_t nBytes = snprintf(p + total, left, "%s,", name);
|
||||
if (nBytes <= 0 || nBytes >= left) {
|
||||
stError("failed to debug snapshot info since %s", tstrerror(TSDB_CODE_OUT_OF_RANGE));
|
||||
taosMemoryFree(p);
|
||||
return;
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pTbl, pIter);
|
||||
taosMemoryFree(tname);
|
||||
total += nBytes;
|
||||
}
|
||||
if (total > 0) {
|
||||
p[total - 1] = 0;
|
||||
|
@ -4761,13 +4805,30 @@ void strArrayDebugInfo(SArray* pArr, char** buf) {
|
|||
int32_t sz = taosArrayGetSize(pArr);
|
||||
if (sz <= 0) return;
|
||||
|
||||
char* p = (char*)taosMemoryCalloc(1, 64 + sz * 64);
|
||||
int32_t total = 0;
|
||||
int32_t code = 0;
|
||||
int32_t total = 0, nBytes = 0;
|
||||
int32_t cap = 64 + sz * 64;
|
||||
|
||||
char* p = (char*)taosMemoryCalloc(1, cap);
|
||||
if (p == NULL) {
|
||||
stError("failed to alloc memory for stream snapshot debug info");
|
||||
return;
|
||||
}
|
||||
|
||||
for (int i = 0; i < sz; i++) {
|
||||
char* name = taosArrayGetP(pArr, i);
|
||||
total += sprintf(p + total, "%s,", name);
|
||||
char* name = taosArrayGetP(pArr, i);
|
||||
int32_t left = cap - strlen(p);
|
||||
nBytes = snprintf(p + total, left, "%s,", name);
|
||||
if (nBytes <= 0 || nBytes >= left) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
stError("failed to debug snapshot info since %s", tstrerror(code));
|
||||
taosMemoryFree(p);
|
||||
return;
|
||||
}
|
||||
|
||||
total += nBytes;
|
||||
}
|
||||
|
||||
p[total - 1] = 0;
|
||||
|
||||
*buf = p;
|
||||
|
@ -4777,16 +4838,16 @@ void dbChkpDebugInfo(SDbChkp* pDb) {
|
|||
char* p[4] = {NULL};
|
||||
|
||||
hashTableToDebug(pDb->pSstTbl[pDb->idx], &p[0]);
|
||||
stTrace("chkp previous file: [%s]", p[0]);
|
||||
if (p[0]) stTrace("chkp previous file: [%s]", p[0]);
|
||||
|
||||
hashTableToDebug(pDb->pSstTbl[1 - pDb->idx], &p[1]);
|
||||
stTrace("chkp curr file: [%s]", p[1]);
|
||||
if (p[1]) stTrace("chkp curr file: [%s]", p[1]);
|
||||
|
||||
strArrayDebugInfo(pDb->pAdd, &p[2]);
|
||||
stTrace("chkp newly addded file: [%s]", p[2]);
|
||||
if (p[2]) stTrace("chkp newly addded file: [%s]", p[2]);
|
||||
|
||||
strArrayDebugInfo(pDb->pDel, &p[3]);
|
||||
stTrace("chkp newly deleted file: [%s]", p[3]);
|
||||
if (p[3]) stTrace("chkp newly deleted file: [%s]", p[3]);
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
taosMemoryFree(p[i]);
|
||||
|
|
Loading…
Reference in New Issue