Merge pull request #15772 from taosdata/feature/stream
fix(util): concurrently hash iterate
This commit is contained in:
commit
ebe133a5b8
|
@ -202,11 +202,6 @@ typedef struct {
|
||||||
int8_t reserved;
|
int8_t reserved;
|
||||||
} STaskSinkFetch;
|
} STaskSinkFetch;
|
||||||
|
|
||||||
enum {
|
|
||||||
TASK_SOURCE__SCAN = 1,
|
|
||||||
TASK_SOURCE__PIPE,
|
|
||||||
};
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TASK_EXEC__NONE = 1,
|
TASK_EXEC__NONE = 1,
|
||||||
TASK_EXEC__PIPE,
|
TASK_EXEC__PIPE,
|
||||||
|
@ -225,11 +220,6 @@ enum {
|
||||||
TASK_SINK__FETCH,
|
TASK_SINK__FETCH,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
|
||||||
TASK_INPUT_TYPE__SUMBIT_BLOCK = 1,
|
|
||||||
TASK_INPUT_TYPE__DATA_BLOCK,
|
|
||||||
};
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TASK_TRIGGER_STATUS__IN_ACTIVE = 1,
|
TASK_TRIGGER_STATUS__IN_ACTIVE = 1,
|
||||||
TASK_TRIGGER_STATUS__ACTIVE,
|
TASK_TRIGGER_STATUS__ACTIVE,
|
||||||
|
|
|
@ -389,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
|
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfVnodeStreamThreads = tsNumOfCores;
|
tsNumOfVnodeStreamThreads = tsNumOfCores / 4;
|
||||||
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
|
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
|
|
|
@ -151,8 +151,8 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
|
||||||
// meta section end
|
// meta section end
|
||||||
|
|
||||||
// seek section
|
// seek section
|
||||||
int walChangeWrite(SWal* pWal, int64_t ver);
|
int64_t walChangeWrite(SWal* pWal, int64_t ver);
|
||||||
int walInitWriteFile(SWal* pWal);
|
int walInitWriteFile(SWal* pWal);
|
||||||
// seek section end
|
// seek section end
|
||||||
|
|
||||||
int64_t walGetSeq();
|
int64_t walGetSeq();
|
||||||
|
|
|
@ -74,7 +74,7 @@ int walInitWriteFile(SWal* pWal) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walChangeWrite(SWal* pWal, int64_t ver) {
|
int64_t walChangeWrite(SWal* pWal, int64_t ver) {
|
||||||
int code;
|
int code;
|
||||||
TdFilePtr pIdxTFile, pLogTFile;
|
TdFilePtr pIdxTFile, pLogTFile;
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
|
|
|
@ -208,6 +208,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
taosCloseFile(&pIdxFile);
|
taosCloseFile(&pIdxFile);
|
||||||
taosCloseFile(&pLogFile);
|
taosCloseFile(&pLogFile);
|
||||||
|
|
||||||
|
taosFsyncFile(pWal->pLogFile);
|
||||||
|
taosFsyncFile(pWal->pIdxFile);
|
||||||
|
|
||||||
walSaveMeta(pWal);
|
walSaveMeta(pWal);
|
||||||
|
|
||||||
// unlock
|
// unlock
|
||||||
|
|
|
@ -832,8 +832,9 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
|
||||||
if (pNode) {
|
if (pNode) {
|
||||||
SHashEntry *pe = pHashObj->hashList[slot];
|
SHashEntry *pe = pHashObj->hashList[slot];
|
||||||
|
|
||||||
uint16_t prevRef = atomic_load_16(&pNode->refCount);
|
/*uint16_t prevRef = atomic_load_16(&pNode->refCount);*/
|
||||||
uint16_t afterRef = atomic_add_fetch_16(&pNode->refCount, 1);
|
uint16_t afterRef = atomic_add_fetch_16(&pNode->refCount, 1);
|
||||||
|
#if 0
|
||||||
ASSERT(prevRef < afterRef);
|
ASSERT(prevRef < afterRef);
|
||||||
|
|
||||||
// the reference count value is overflow, which will cause the delete node operation immediately.
|
// the reference count value is overflow, which will cause the delete node operation immediately.
|
||||||
|
@ -845,6 +846,8 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
|
||||||
} else {
|
} else {
|
||||||
data = GET_HASH_NODE_DATA(pNode);
|
data = GET_HASH_NODE_DATA(pNode);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
data = GET_HASH_NODE_DATA(pNode);
|
||||||
|
|
||||||
if (afterRef >= MAX_WARNING_REF_COUNT) {
|
if (afterRef >= MAX_WARNING_REF_COUNT) {
|
||||||
uWarn("hash entry ref count is abnormally high: %d", afterRef);
|
uWarn("hash entry ref count is abnormally high: %d", afterRef);
|
||||||
|
|
Loading…
Reference in New Issue