Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/row_optimize
This commit is contained in:
commit
151d07e898
|
@ -208,7 +208,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
||||||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||||
|
|
||||||
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableList)[0];
|
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0];
|
||||||
pInfo->pRes->info.groupId = pKeyInfo->groupId;
|
pInfo->pRes->info.groupId = pKeyInfo->groupId;
|
||||||
|
|
||||||
if (taosArrayGetSize(pInfo->pUidList) > 0) {
|
if (taosArrayGetSize(pInfo->pUidList) > 0) {
|
||||||
|
|
|
@ -4366,6 +4366,7 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t
|
||||||
int32_t numOfParams = cJSON_GetArraySize(binDesc);
|
int32_t numOfParams = cJSON_GetArraySize(binDesc);
|
||||||
int32_t startIndex;
|
int32_t startIndex;
|
||||||
if (numOfParams != 4) {
|
if (numOfParams != 4) {
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4376,15 +4377,18 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t
|
||||||
cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity");
|
cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity");
|
||||||
|
|
||||||
if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) {
|
if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) {
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (count->valueint <= 0 || count->valueint > 1000) { // limit count to 1000
|
if (count->valueint <= 0 || count->valueint > 1000) { // limit count to 1000
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) ||
|
if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) ||
|
||||||
(factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) {
|
(factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) {
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4402,12 +4406,14 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t
|
||||||
// linear bin process
|
// linear bin process
|
||||||
if (width->valuedouble == 0) {
|
if (width->valuedouble == 0) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < counter + 1; ++i) {
|
for (int i = 0; i < counter + 1; ++i) {
|
||||||
intervals[startIndex] = start->valuedouble + i * width->valuedouble;
|
intervals[startIndex] = start->valuedouble + i * width->valuedouble;
|
||||||
if (isinf(intervals[startIndex])) {
|
if (isinf(intervals[startIndex])) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
startIndex++;
|
startIndex++;
|
||||||
|
@ -4416,22 +4422,26 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t
|
||||||
// log bin process
|
// log bin process
|
||||||
if (start->valuedouble == 0) {
|
if (start->valuedouble == 0) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) {
|
if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < counter + 1; ++i) {
|
for (int i = 0; i < counter + 1; ++i) {
|
||||||
intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0);
|
intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0);
|
||||||
if (isinf(intervals[startIndex])) {
|
if (isinf(intervals[startIndex])) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
startIndex++;
|
startIndex++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4446,6 +4456,7 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t
|
||||||
}
|
}
|
||||||
} else if (cJSON_IsArray(binDesc)) { /* user input bins */
|
} else if (cJSON_IsArray(binDesc)) { /* user input bins */
|
||||||
if (binType != USER_INPUT_BIN) {
|
if (binType != USER_INPUT_BIN) {
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
numOfBins = cJSON_GetArraySize(binDesc);
|
numOfBins = cJSON_GetArraySize(binDesc);
|
||||||
|
@ -4453,6 +4464,7 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t
|
||||||
cJSON* bin = binDesc->child;
|
cJSON* bin = binDesc->child;
|
||||||
if (bin == NULL) {
|
if (bin == NULL) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
@ -4460,16 +4472,19 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t
|
||||||
intervals[i] = bin->valuedouble;
|
intervals[i] = bin->valuedouble;
|
||||||
if (!cJSON_IsNumber(bin)) {
|
if (!cJSON_IsNumber(bin)) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (i != 0 && intervals[i] <= intervals[i - 1]) {
|
if (i != 0 && intervals[i] <= intervals[i - 1]) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
bin = bin->next;
|
bin = bin->next;
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4482,6 +4497,8 @@ static bool getHistogramBinDesc(SHistoFuncInfo* pInfo, char* binDescStr, int8_t
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4495,18 +4512,23 @@ bool histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultIn
|
||||||
pInfo->totalCount = 0;
|
pInfo->totalCount = 0;
|
||||||
pInfo->normalized = 0;
|
pInfo->normalized = 0;
|
||||||
|
|
||||||
int8_t binType = getHistogramBinType(varDataVal(pCtx->param[1].param.pz));
|
char *binTypeStr = strndup(varDataVal(pCtx->param[1].param.pz), varDataLen(pCtx->param[1].param.pz));
|
||||||
|
int8_t binType = getHistogramBinType(binTypeStr);
|
||||||
|
taosMemoryFree(binTypeStr);
|
||||||
|
|
||||||
if (binType == UNKNOWN_BIN) {
|
if (binType == UNKNOWN_BIN) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
char* binDesc = varDataVal(pCtx->param[2].param.pz);
|
char* binDesc = strndup(varDataVal(pCtx->param[2].param.pz), varDataLen(pCtx->param[2].param.pz));
|
||||||
int64_t normalized = pCtx->param[3].param.i;
|
int64_t normalized = pCtx->param[3].param.i;
|
||||||
if (normalized != 0 && normalized != 1) {
|
if (normalized != 0 && normalized != 1) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (!getHistogramBinDesc(pInfo, binDesc, binType, (bool)normalized)) {
|
if (!getHistogramBinDesc(pInfo, binDesc, binType, (bool)normalized)) {
|
||||||
|
taosMemoryFree(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(binDesc);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -596,6 +596,7 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
int32_t code = stbSplAppendWEnd(pPartWin, &index);
|
int32_t code = stbSplAppendWEnd(pPartWin, &index);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
nodesDestroyNode(pMergeWin->pTsEnd);
|
||||||
pMergeWin->pTsEnd = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
|
pMergeWin->pTsEnd = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
|
||||||
if (NULL == pMergeWin->pTsEnd) {
|
if (NULL == pMergeWin->pTsEnd) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -2620,6 +2620,7 @@ static bool getHistogramBinDesc(SHistoFuncBin **bins, int32_t *binNum, char *bin
|
||||||
int32_t numOfParams = cJSON_GetArraySize(binDesc);
|
int32_t numOfParams = cJSON_GetArraySize(binDesc);
|
||||||
int32_t startIndex;
|
int32_t startIndex;
|
||||||
if (numOfParams != 4) {
|
if (numOfParams != 4) {
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2630,15 +2631,18 @@ static bool getHistogramBinDesc(SHistoFuncBin **bins, int32_t *binNum, char *bin
|
||||||
cJSON *infinity = cJSON_GetObjectItem(binDesc, "infinity");
|
cJSON *infinity = cJSON_GetObjectItem(binDesc, "infinity");
|
||||||
|
|
||||||
if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) {
|
if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) {
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (count->valueint <= 0 || count->valueint > 1000) { // limit count to 1000
|
if (count->valueint <= 0 || count->valueint > 1000) { // limit count to 1000
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) ||
|
if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) ||
|
||||||
(factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) {
|
(factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) {
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2656,12 +2660,14 @@ static bool getHistogramBinDesc(SHistoFuncBin **bins, int32_t *binNum, char *bin
|
||||||
// linear bin process
|
// linear bin process
|
||||||
if (width->valuedouble == 0) {
|
if (width->valuedouble == 0) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < counter + 1; ++i) {
|
for (int i = 0; i < counter + 1; ++i) {
|
||||||
intervals[startIndex] = start->valuedouble + i * width->valuedouble;
|
intervals[startIndex] = start->valuedouble + i * width->valuedouble;
|
||||||
if (isinf(intervals[startIndex])) {
|
if (isinf(intervals[startIndex])) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
startIndex++;
|
startIndex++;
|
||||||
|
@ -2670,22 +2676,26 @@ static bool getHistogramBinDesc(SHistoFuncBin **bins, int32_t *binNum, char *bin
|
||||||
// log bin process
|
// log bin process
|
||||||
if (start->valuedouble == 0) {
|
if (start->valuedouble == 0) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) {
|
if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < counter + 1; ++i) {
|
for (int i = 0; i < counter + 1; ++i) {
|
||||||
intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0);
|
intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0);
|
||||||
if (isinf(intervals[startIndex])) {
|
if (isinf(intervals[startIndex])) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
startIndex++;
|
startIndex++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2700,6 +2710,7 @@ static bool getHistogramBinDesc(SHistoFuncBin **bins, int32_t *binNum, char *bin
|
||||||
}
|
}
|
||||||
} else if (cJSON_IsArray(binDesc)) { /* user input bins */
|
} else if (cJSON_IsArray(binDesc)) { /* user input bins */
|
||||||
if (binType != USER_INPUT_BIN) {
|
if (binType != USER_INPUT_BIN) {
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
numOfBins = cJSON_GetArraySize(binDesc);
|
numOfBins = cJSON_GetArraySize(binDesc);
|
||||||
|
@ -2707,6 +2718,7 @@ static bool getHistogramBinDesc(SHistoFuncBin **bins, int32_t *binNum, char *bin
|
||||||
cJSON *bin = binDesc->child;
|
cJSON *bin = binDesc->child;
|
||||||
if (bin == NULL) {
|
if (bin == NULL) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
@ -2714,16 +2726,19 @@ static bool getHistogramBinDesc(SHistoFuncBin **bins, int32_t *binNum, char *bin
|
||||||
intervals[i] = bin->valuedouble;
|
intervals[i] = bin->valuedouble;
|
||||||
if (!cJSON_IsNumber(bin)) {
|
if (!cJSON_IsNumber(bin)) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (i != 0 && intervals[i] <= intervals[i - 1]) {
|
if (i != 0 && intervals[i] <= intervals[i - 1]) {
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
bin = bin->next;
|
bin = bin->next;
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2735,8 +2750,9 @@ static bool getHistogramBinDesc(SHistoFuncBin **bins, int32_t *binNum, char *bin
|
||||||
(*bins)[i].count = 0;
|
(*bins)[i].count = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON_Delete(binDesc);
|
|
||||||
taosMemoryFree(intervals);
|
taosMemoryFree(intervals);
|
||||||
|
cJSON_Delete(binDesc);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2748,14 +2764,19 @@ int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
|
||||||
int32_t numOfBins = 0;
|
int32_t numOfBins = 0;
|
||||||
int32_t totalCount = 0;
|
int32_t totalCount = 0;
|
||||||
|
|
||||||
int8_t binType = getHistogramBinType(varDataVal(pInput[1].columnData->pData));
|
char *binTypeStr = strndup(varDataVal(pInput[1].columnData->pData), varDataLen(pInput[1].columnData->pData));
|
||||||
char *binDesc = varDataVal(pInput[2].columnData->pData);
|
int8_t binType = getHistogramBinType(binTypeStr);
|
||||||
|
taosMemoryFree(binTypeStr);
|
||||||
|
|
||||||
|
char *binDesc = strndup(varDataVal(pInput[2].columnData->pData), varDataLen(pInput[2].columnData->pData));
|
||||||
int64_t normalized = *(int64_t *)(pInput[3].columnData->pData);
|
int64_t normalized = *(int64_t *)(pInput[3].columnData->pData);
|
||||||
|
|
||||||
int32_t type = GET_PARAM_TYPE(pInput);
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
if (!getHistogramBinDesc(&bins, &numOfBins, binDesc, binType, (bool)normalized)) {
|
if (!getHistogramBinDesc(&bins, &numOfBins, binDesc, binType, (bool)normalized)) {
|
||||||
|
taosMemoryFree(binDesc);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(binDesc);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
if (colDataIsNull_s(pInputData, i)) {
|
if (colDataIsNull_s(pInputData, i)) {
|
||||||
|
@ -2785,6 +2806,8 @@ int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
colInfoDataEnsureCapacity(pOutputData, numOfBins, false);
|
||||||
|
|
||||||
for (int32_t k = 0; k < numOfBins; ++k) {
|
for (int32_t k = 0; k < numOfBins; ++k) {
|
||||||
int32_t len;
|
int32_t len;
|
||||||
char buf[512] = {0};
|
char buf[512] = {0};
|
||||||
|
|
|
@ -73,11 +73,12 @@ typedef struct SSyncTimer {
|
||||||
SSyncHbTimerData hbData;
|
SSyncHbTimerData hbData;
|
||||||
} SSyncTimer;
|
} SSyncTimer;
|
||||||
|
|
||||||
typedef struct SElectTimer {
|
typedef struct SElectTimerParam {
|
||||||
uint64_t logicClock;
|
uint64_t logicClock;
|
||||||
SSyncNode* pSyncNode;
|
SSyncNode* pSyncNode;
|
||||||
|
int64_t executeTime;
|
||||||
void* pData;
|
void* pData;
|
||||||
} SElectTimer;
|
} SElectTimerParam;
|
||||||
|
|
||||||
typedef struct SPeerState {
|
typedef struct SPeerState {
|
||||||
SyncIndex lastSendIndex;
|
SyncIndex lastSendIndex;
|
||||||
|
@ -153,6 +154,7 @@ typedef struct SSyncNode {
|
||||||
uint64_t electTimerLogicClock;
|
uint64_t electTimerLogicClock;
|
||||||
TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp
|
TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp
|
||||||
uint64_t electTimerCounter;
|
uint64_t electTimerCounter;
|
||||||
|
SElectTimerParam electTimerParam;
|
||||||
|
|
||||||
// heartbeat timer
|
// heartbeat timer
|
||||||
tmr_h pHeartbeatTimer;
|
tmr_h pHeartbeatTimer;
|
||||||
|
|
|
@ -1104,8 +1104,16 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
if (syncIsInit()) {
|
if (syncIsInit()) {
|
||||||
pSyncNode->electTimerMS = ms;
|
pSyncNode->electTimerMS = ms;
|
||||||
|
|
||||||
|
int64_t execTime = taosGetTimestampMs() + ms;
|
||||||
|
atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
|
||||||
|
atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
|
||||||
|
pSyncNode->electTimerParam.pSyncNode = pSyncNode;
|
||||||
|
pSyncNode->electTimerParam.pData = NULL;
|
||||||
|
|
||||||
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager,
|
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager,
|
||||||
&pSyncNode->pElectTimer);
|
&pSyncNode->pElectTimer);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
|
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
|
||||||
}
|
}
|
||||||
|
@ -1855,27 +1863,35 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||||
SSyncNode* pNode = param;
|
|
||||||
|
|
||||||
if (!syncIsInit()) return;
|
if (!syncIsInit()) return;
|
||||||
|
|
||||||
|
SSyncNode* pNode = (SSyncNode*)param;
|
||||||
|
|
||||||
if (pNode == NULL) return;
|
if (pNode == NULL) return;
|
||||||
if (pNode->syncEqMsg == NULL) return;
|
if (pNode->syncEqMsg == NULL) return;
|
||||||
|
|
||||||
|
int64_t tsNow = taosGetTimestampMs();
|
||||||
|
if (tsNow < pNode->electTimerParam.executeTime) return;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
int32_t code =
|
int32_t code =
|
||||||
syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerLogicClock, pNode->electTimerMS, pNode);
|
syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sError("failed to build elect msg");
|
sError("failed to build elect msg");
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncTimeout* pTimeout = rpcMsg.pCont;
|
SyncTimeout* pTimeout = rpcMsg.pCont;
|
||||||
sTrace("enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
|
sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
|
||||||
|
|
||||||
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
|
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sError("failed to sync enqueue elect msg since %s", terrstr());
|
sError("failed to sync enqueue elect msg since %s", terrstr());
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -193,7 +193,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
|
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
|
||||||
if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
|
if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
|
||||||
int64_t currentTerm = pNode->pRaftStore->currentTerm;
|
int64_t currentTerm = pNode->pRaftStore->currentTerm;
|
||||||
|
|
||||||
// save error code, otherwise it will be overwritten
|
// save error code, otherwise it will be overwritten
|
||||||
|
@ -252,7 +252,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
|
||||||
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
|
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
|
||||||
const char* format, ...) {
|
const char* format, ...) {
|
||||||
SSyncNode* pNode = pSender->pSyncNode;
|
SSyncNode* pNode = pSender->pSyncNode;
|
||||||
if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
|
if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
|
||||||
|
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
|
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
|
@ -304,7 +304,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
|
||||||
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
|
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
|
||||||
const char* format, ...) {
|
const char* format, ...) {
|
||||||
SSyncNode* pNode = pReceiver->pSyncNode;
|
SSyncNode* pNode = pReceiver->pSyncNode;
|
||||||
if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
|
if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
|
||||||
|
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
|
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
|
@ -554,4 +554,4 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
|
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
|
||||||
pMsg->voteGranted, s);
|
pMsg->voteGranted, s);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue