Merge branch '3.0' into fix/TD-31870/os4
This commit is contained in:
commit
e3e3a0e2bb
|
@ -2516,24 +2516,25 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
|
||||||
pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
|
pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
|
||||||
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
|
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
|
||||||
if (len >= size - 1) {
|
if (len >= size - 1) {
|
||||||
return code;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t j = 0; j < rows; j++) {
|
for (int32_t j = 0; j < rows; j++) {
|
||||||
len += snprintf(dumpBuf + len, size - len, "%s|", flag);
|
len += snprintf(dumpBuf + len, size - len, "%s|", flag);
|
||||||
if (len >= size - 1) {
|
if (len >= size - 1) {
|
||||||
return code;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t k = 0; k < colNum; k++) {
|
for (int32_t k = 0; k < colNum; k++) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||||
if (pColInfoData == NULL) {
|
if (pColInfoData == NULL) {
|
||||||
return terrno;
|
code = terrno;
|
||||||
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
|
if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
|
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2543,51 +2544,51 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
|
||||||
memset(pBuf, 0, sizeof(pBuf));
|
memset(pBuf, 0, sizeof(pBuf));
|
||||||
(void)formatTimestamp(pBuf, *(uint64_t*)var, pColInfoData->info.precision);
|
(void)formatTimestamp(pBuf, *(uint64_t*)var, pColInfoData->info.precision);
|
||||||
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
|
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int8_t*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int8_t*)var);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_UTINYINT:
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint8_t*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint8_t*)var);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_SMALLINT:
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int16_t*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int16_t*)var);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_USMALLINT:
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint16_t*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint16_t*)var);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_INT:
|
case TSDB_DATA_TYPE_INT:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_UINT:
|
case TSDB_DATA_TYPE_UINT:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15" PRId64 " |", *(int64_t*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15" PRId64 " |", *(int64_t*)var);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_UBIGINT:
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15" PRIu64 " |", *(uint64_t*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15" PRIu64 " |", *(uint64_t*)var);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_FLOAT:
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15f |", *(double*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15f |", *(double*)var);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_VARCHAR:
|
case TSDB_DATA_TYPE_VARCHAR:
|
||||||
case TSDB_DATA_TYPE_VARBINARY:
|
case TSDB_DATA_TYPE_VARBINARY:
|
||||||
|
@ -2598,24 +2599,33 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
|
||||||
dataSize = TMIN(dataSize, 50);
|
dataSize = TMIN(dataSize, 50);
|
||||||
memcpy(pBuf, varDataVal(pData), dataSize);
|
memcpy(pBuf, varDataVal(pData), dataSize);
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
|
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
} break;
|
} break;
|
||||||
case TSDB_DATA_TYPE_NCHAR: {
|
case TSDB_DATA_TYPE_NCHAR: {
|
||||||
char* pData = colDataGetVarData(pColInfoData, j);
|
char* pData = colDataGetVarData(pColInfoData, j);
|
||||||
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
|
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
|
||||||
memset(pBuf, 0, sizeof(pBuf));
|
memset(pBuf, 0, sizeof(pBuf));
|
||||||
(void)taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf);
|
code = taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf);
|
||||||
|
if (code < 0) {
|
||||||
|
uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
|
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
|
||||||
if (len >= size - 1) return 0;
|
if (len >= size - 1) goto _exit;
|
||||||
} break;
|
} break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
len += snprintf(dumpBuf + len, size - len, "%d\n", j);
|
len += snprintf(dumpBuf + len, size - len, "%d\n", j);
|
||||||
if (len >= size - 1) return code;
|
if (len >= size - 1) goto _exit;
|
||||||
}
|
}
|
||||||
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
|
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
|
||||||
|
|
||||||
*pDataBuf = dumpBuf;
|
*pDataBuf = dumpBuf;
|
||||||
|
dumpBuf = NULL;
|
||||||
|
_exit:
|
||||||
|
if (dumpBuf) {
|
||||||
|
taosMemoryFree(dumpBuf);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2221,7 +2221,11 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
(void) epsetToStr(&pEntry->epset, buf, tListLen(buf)); // ignore this error since it is only for log file
|
int32_t ret = epsetToStr(&pEntry->epset, buf, tListLen(buf)); // ignore this error since it is only for log file
|
||||||
|
if (ret != 0) { // print error and continue
|
||||||
|
mError("failed to convert epset to str, code:%s", tstrerror(ret));
|
||||||
|
}
|
||||||
|
|
||||||
mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
|
mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2231,7 +2235,7 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
|
static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -2249,8 +2253,6 @@ static int32_t addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
|
||||||
mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
|
mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// this function runs by only one thread, so it is not multi-thread safe
|
// this function runs by only one thread, so it is not multi-thread safe
|
||||||
|
@ -2311,7 +2313,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
|
mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
|
||||||
updateAllVgroups = true;
|
updateAllVgroups = true;
|
||||||
execInfo.switchFromFollower = false; // reset the flag
|
execInfo.switchFromFollower = false; // reset the flag
|
||||||
(void) addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
|
addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -194,10 +194,13 @@ int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)tSerializeDropOrphanTaskMsg(pReq, contLen, &msg);
|
int32_t code = tSerializeDropOrphanTaskMsg(pReq, contLen, &msg);
|
||||||
|
if (code <= 0) {
|
||||||
|
mError("failed to serialize the drop orphan task msg, code:%s", tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_DROP_ORPHANTASKS, .pCont = pReq, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_DROP_ORPHANTASKS, .pCont = pReq, .contLen = contLen};
|
||||||
int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
if (code) {
|
if (code) {
|
||||||
mError("failed to put drop-orphan task msg into write queue, code:%s", tstrerror(code));
|
mError("failed to put drop-orphan task msg into write queue, code:%s", tstrerror(code));
|
||||||
} else {
|
} else {
|
||||||
|
@ -216,7 +219,7 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
|
||||||
mndKillTransImpl(pMnode, pMsg->transId, "");
|
mndKillTransImpl(pMnode, pMsg->transId, "");
|
||||||
|
|
||||||
streamMutexLock(&execInfo.lock);
|
streamMutexLock(&execInfo.lock);
|
||||||
(void) mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId);
|
code = mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId); // do thing if failed
|
||||||
streamMutexUnlock(&execInfo.lock);
|
streamMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream);
|
code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream);
|
||||||
|
@ -393,7 +396,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
||||||
if (numOfUpdated > 0) {
|
if (numOfUpdated > 0) {
|
||||||
mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
||||||
(void) setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
int32_t unused = setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool snodeChanged = false;
|
bool snodeChanged = false;
|
||||||
|
|
|
@ -165,7 +165,10 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
(void)epsetToStr(&entry.epset, buf, tListLen(buf));
|
code = epsetToStr(&entry.epset, buf, tListLen(buf));
|
||||||
|
if (code != 0) { // print error and continue
|
||||||
|
mError("failed to convert epset to str, code:%s", tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
void *p = taosArrayPush(pVgroupList, &entry);
|
void *p = taosArrayPush(pVgroupList, &entry);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
@ -198,7 +201,10 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
(void)epsetToStr(&entry.epset, buf, tListLen(buf));
|
code = epsetToStr(&entry.epset, buf, tListLen(buf));
|
||||||
|
if (code != 0) { // print error and continue
|
||||||
|
mError("failed to convert epset to str, code:%s", tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
void *p = taosArrayPush(pVgroupList, &entry);
|
void *p = taosArrayPush(pVgroupList, &entry);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
@ -424,9 +430,12 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
(void) epsetToStr(&epset, buf, tListLen(buf));
|
code = epsetToStr(&epset, buf, tListLen(buf));
|
||||||
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
|
if (code != 0) { // print error and continue
|
||||||
|
mError("failed to convert epset to str, code:%s", tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
|
||||||
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
|
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
|
@ -639,8 +648,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
|
||||||
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
|
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
|
||||||
void *pBuf = NULL;
|
void *pBuf = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
(void)streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
|
bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
|
||||||
|
|
||||||
int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
|
int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -914,8 +922,15 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. remove stream entry in consensus hash table and checkpoint-report hash table
|
// 2. remove stream entry in consensus hash table and checkpoint-report hash table
|
||||||
(void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
|
code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
|
||||||
(void) mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid);
|
if (code) {
|
||||||
|
mError("failed to clear consensus checkpointId, code:%s", tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
code = mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid);
|
||||||
|
if (code) {
|
||||||
|
mError("failed to clear the checkpoint report info, code:%s", tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
streamMutexUnlock(&pExecNode->lock);
|
streamMutexUnlock(&pExecNode->lock);
|
||||||
destroyStreamTaskIter(pIter);
|
destroyStreamTaskIter(pIter);
|
||||||
|
|
|
@ -229,7 +229,7 @@ int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
|
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
|
||||||
|
|
||||||
if (p->info.rows > 0) {
|
if (p->info.rows > 0) {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -150,10 +150,11 @@ static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket
|
||||||
pBucket->size -= 1;
|
pBucket->size -= 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) {
|
static int32_t doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) {
|
||||||
|
int32_t code = 0;
|
||||||
size_t numOfPages = taosArrayGetSize(pBucket->pPageIdList);
|
size_t numOfPages = taosArrayGetSize(pBucket->pPageIdList);
|
||||||
if (numOfPages <= 1) {
|
if (numOfPages <= 1) {
|
||||||
return;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t* firstPage = taosArrayGet(pBucket->pPageIdList, 0);
|
int32_t* firstPage = taosArrayGet(pBucket->pPageIdList, 0);
|
||||||
|
@ -164,11 +165,14 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) {
|
||||||
|
|
||||||
if (pLast->num <= sizeof(SFilePage)) {
|
if (pLast->num <= sizeof(SFilePage)) {
|
||||||
// this is empty
|
// this is empty
|
||||||
// TODO check ret
|
code = dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
|
||||||
(void)dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed to recycle buf page since %s", __func__, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
releaseBufPage(pHashObj->pBuf, pFirst);
|
releaseBufPage(pHashObj->pBuf, pFirst);
|
||||||
taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
|
taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
|
||||||
return;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* pStart = pLast->data;
|
char* pStart = pLast->data;
|
||||||
|
@ -191,8 +195,11 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) {
|
||||||
pStart += nodeSize;
|
pStart += nodeSize;
|
||||||
if (pLast->num <= sizeof(SFilePage)) {
|
if (pLast->num <= sizeof(SFilePage)) {
|
||||||
// this is empty
|
// this is empty
|
||||||
// TODO check ret
|
code = dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
|
||||||
(void)dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed to recycle buf page since %s", __func__, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
releaseBufPage(pHashObj->pBuf, pFirst);
|
releaseBufPage(pHashObj->pBuf, pFirst);
|
||||||
taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
|
taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
|
||||||
break;
|
break;
|
||||||
|
@ -210,6 +217,7 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doAddNewBucket(SLHashObj* pHashObj) {
|
static int32_t doAddNewBucket(SLHashObj* pHashObj) {
|
||||||
|
@ -403,7 +411,10 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
|
||||||
releaseBufPage(pHashObj->pBuf, p);
|
releaseBufPage(pHashObj->pBuf, p);
|
||||||
}
|
}
|
||||||
|
|
||||||
doTrimBucketPages(pHashObj, pBucket);
|
code = doTrimBucketPages(pHashObj, pBucket);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -1308,8 +1308,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo
|
TAOS_CHECK_RETURN(taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET));
|
||||||
(void)taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET);
|
|
||||||
|
|
||||||
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
|
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
|
||||||
int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
|
int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
|
||||||
|
|
|
@ -1535,8 +1535,9 @@ int taosSeekCFile(FILE *file, int64_t offset, int whence) {
|
||||||
int code = fseeko(file, offset, whence);
|
int code = fseeko(file, offset, whence);
|
||||||
if (-1 == code) {
|
if (-1 == code) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
code = terrno;
|
||||||
}
|
}
|
||||||
return terrno;
|
return code;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -351,7 +351,7 @@ int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
iconv_t conv = taosAcquireConv(&idx, C2M);
|
iconv_t conv = taosAcquireConv(&idx, C2M);
|
||||||
if ((iconv_t)-1 == conv) {
|
if ((iconv_t)-1 == conv) {
|
||||||
return TSDB_CODE_APP_ERROR;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ucs4_input_len = ucs4_max_len;
|
size_t ucs4_input_len = ucs4_max_len;
|
||||||
|
|
|
@ -316,7 +316,7 @@ static char* evictBufPage(SDiskbasedBuf* pBuf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
(void)tdListPopNode(pBuf->lruList, pn);
|
pn = tdListPopNode(pBuf->lruList, pn);
|
||||||
|
|
||||||
SPageInfo* d = *(SPageInfo**)pn->data;
|
SPageInfo* d = *(SPageInfo**)pn->data;
|
||||||
|
|
||||||
|
@ -337,7 +337,7 @@ static int32_t lruListPushFront(SList* pList, SPageInfo* pi) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
|
static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
|
||||||
(void)tdListPopNode(pList, pi->pn);
|
pi->pn = tdListPopNode(pList, pi->pn);
|
||||||
tdListPrependNode(pList, pi->pn);
|
tdListPrependNode(pList, pi->pn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,8 +474,11 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
|
||||||
pBuf->totalBufSize += pBuf->pageSize;
|
pBuf->totalBufSize += pBuf->pageSize;
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(availablePage);
|
taosMemoryFree(availablePage);
|
||||||
(void)taosArrayPop(pBuf->pIdList);
|
SPageInfo **pLast = taosArrayPop(pBuf->pIdList);
|
||||||
(void)tSimpleHashRemove(pBuf->all, pageId, sizeof(int32_t));
|
int32_t ret = tSimpleHashRemove(pBuf->all, pageId, sizeof(int32_t));
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("%s failed to clear pageId %d from buf hash-set since %s", __func__, *pageId, tstrerror(ret));
|
||||||
|
}
|
||||||
taosMemoryFree(pi);
|
taosMemoryFree(pi);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
Loading…
Reference in New Issue