[TD-13063]<feature>: 3.0 on windows (#10765)
* [TD-13063]<feature>: 3.0 on Windows * add pthread in contrib * fix linux compile * fix osSemaphore * add gnu regex for Windows * fix compile error for Windows * support arm platform * port more OS files * fix for Windows compile * port more files * fix macOS on x86_64 * port osFile * port osSemaphone.h * port osSocket.c * port tconfig.c * port ttimer.c * add couple files * merge with 3.0
This commit is contained in:
parent
82431b840b
commit
3c6c518470
|
@ -42,7 +42,7 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin
|
|||
|
||||
SET(TD_DARWIN TRUE)
|
||||
SET(OSTYPE "macOS")
|
||||
ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare")
|
||||
ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare -Wno-return-type")
|
||||
|
||||
MESSAGE("Current system processor is ${CMAKE_SYSTEM_PROCESSOR}.")
|
||||
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "x86_64")
|
||||
|
|
|
@ -33,7 +33,7 @@ int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
setErrno(pRequest, code);
|
||||
|
||||
free(pMsg->pData);
|
||||
sem_post(&pRequest->body.rspSem);
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -42,7 +42,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
free(pMsg->pData);
|
||||
setErrno(pRequest, code);
|
||||
sem_post(&pRequest->body.rspSem);
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -78,7 +78,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
pTscObj->pAppInfo->numOfConns);
|
||||
|
||||
free(pMsg->pData);
|
||||
sem_post(&pRequest->body.rspSem);
|
||||
tsem_post(&pRequest->body.rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -96,12 +96,12 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
|||
CM_ENCODE_OVER:
|
||||
tfree(buf);
|
||||
if (terrno != 0) {
|
||||
mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
|
||||
mError("consumer:%" PRId64 ", failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
|
||||
sdbFreeRaw(pRaw);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mTrace("consumer:%ld, encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
|
||||
mTrace("consumer:%" PRId64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
|
||||
return pRaw;
|
||||
}
|
||||
|
||||
|
@ -140,7 +140,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
|||
CM_DECODE_OVER:
|
||||
tfree(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
|
||||
mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
|
||||
tfree(pRow);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -149,17 +149,17 @@ CM_DECODE_OVER:
|
|||
}
|
||||
|
||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||
mTrace("consumer:%ld, perform insert action", pConsumer->consumerId);
|
||||
mTrace("consumer:%" PRId64 ", perform insert action", pConsumer->consumerId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||
mTrace("consumer:%ld, perform delete action", pConsumer->consumerId);
|
||||
mTrace("consumer:%" PRId64 ", perform delete action", pConsumer->consumerId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
|
||||
mTrace("consumer:%ld, perform update action", pOldConsumer->consumerId);
|
||||
mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId);
|
||||
|
||||
// TODO handle update
|
||||
/*taosWLockLatch(&pOldConsumer->lock);*/
|
||||
|
|
|
@ -448,7 +448,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
|||
for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
|
||||
int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);
|
||||
|
||||
mInfo("mq remove lost consumer %ld", lostConsumerId);
|
||||
mInfo("mq remove lost consumer %" PRId64 "", lostConsumerId);
|
||||
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
|
||||
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
|
||||
|
@ -479,7 +479,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
|||
else
|
||||
vgThisConsumerAfterRb = vgEachConsumer;
|
||||
|
||||
mInfo("mq consumer:%ld, connectted vgroup number change from %d to %d", pSubConsumer->consumerId,
|
||||
mInfo("mq consumer:%" PRId64 ", connectted vgroup number change from %d to %d", pSubConsumer->consumerId,
|
||||
vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
|
||||
|
||||
while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
|
||||
|
@ -503,7 +503,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
|||
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
|
||||
}
|
||||
|
||||
mInfo("mq consumer:%ld, status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status);
|
||||
mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status);
|
||||
|
||||
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
|
||||
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
|
||||
|
@ -537,13 +537,13 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
|||
mndSplitSubscribeKey(pSub->key, topic, cgroup);
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
|
||||
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic,
|
||||
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, topic,
|
||||
pConsumerEp->consumerId);
|
||||
|
||||
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
} else {
|
||||
mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId,
|
||||
mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "", pConsumerEp->vgId,
|
||||
pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
|
||||
|
||||
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
|
||||
|
@ -1099,7 +1099,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
|||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
|
||||
bool createSub = false;
|
||||
if (pSub == NULL) {
|
||||
mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName);
|
||||
mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup, newTopicName);
|
||||
pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
|
||||
createSub = true;
|
||||
|
||||
|
@ -1118,7 +1118,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
|||
pConsumerEp->consumerId = consumerId;
|
||||
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
|
||||
if (pConsumerEp->oldConsumerId == -1) {
|
||||
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, newTopicName,
|
||||
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, newTopicName,
|
||||
pConsumerEp->consumerId);
|
||||
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
|
||||
} else {
|
||||
|
|
|
@ -196,7 +196,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
|
|||
}
|
||||
|
||||
int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
|
||||
if (!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen) != 0) {
|
||||
if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
|
||||
code = TSDB_CODE_CHECKSUM_ERROR;
|
||||
mError("failed to read file:%s since %s", file, tstrerror(code));
|
||||
break;
|
||||
|
|
|
@ -23,6 +23,7 @@ static SDataSinkManager gDataSinkManager = {0};
|
|||
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
|
||||
gDataSinkManager.cfg = *cfg;
|
||||
pthread_mutex_init(&gDataSinkManager.mutex, NULL);
|
||||
return 0; // to avoid compiler eror
|
||||
}
|
||||
|
||||
int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle) {
|
||||
|
|
|
@ -413,7 +413,7 @@ int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen) {
|
|||
|
||||
void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
|
||||
printf("==================== linear hash ====================\n");
|
||||
printf("total bucket:%d, size:%ld, ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO);
|
||||
printf("total bucket:%d, size:%" PRId64 ", ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO);
|
||||
|
||||
dBufSetPrintInfo(pHashObj->pBuf);
|
||||
|
||||
|
@ -425,4 +425,4 @@ void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
|
|||
} else {
|
||||
dBufPrintStatis(pHashObj->pBuf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
|
|||
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg);
|
||||
|
||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||
sTrace("DropStaleResponse, receive term:%lu, current term:%lu", pMsg->term, ths->pRaftStore->currentTerm);
|
||||
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ static void syncEnvTick(void *param, void *tmrId) {
|
|||
if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) {
|
||||
++(pSyncEnv->envTickTimerCounter);
|
||||
sTrace(
|
||||
"syncEnvTick do ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, "
|
||||
"syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", "
|
||||
"envTickTimerMS:%d, tmrId:%p",
|
||||
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
|
||||
pSyncEnv->envTickTimerMS, tmrId);
|
||||
|
@ -64,7 +64,7 @@ static void syncEnvTick(void *param, void *tmrId) {
|
|||
taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer);
|
||||
} else {
|
||||
sTrace(
|
||||
"syncEnvTick pass ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, "
|
||||
"syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", "
|
||||
"envTickTimerMS:%d, tmrId:%p",
|
||||
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
|
||||
pSyncEnv->envTickTimerMS, tmrId);
|
||||
|
@ -74,7 +74,7 @@ static void syncEnvTick(void *param, void *tmrId) {
|
|||
static SSyncEnv *doSyncEnvStart() {
|
||||
SSyncEnv *pSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv));
|
||||
assert(pSyncEnv != NULL);
|
||||
memset(pSyncEnv, 0, sizeof(pSyncEnv));
|
||||
memset(pSyncEnv, 0, sizeof(SSyncEnv));
|
||||
|
||||
pSyncEnv->envTickTimerCounter = 0;
|
||||
pSyncEnv->envTickTimerMS = ENV_TICK_TIMER_MS;
|
||||
|
|
|
@ -420,46 +420,46 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
|
|||
|
||||
// tla+ log vars
|
||||
cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore));
|
||||
snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->commitIndex);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", pSyncNode->commitIndex);
|
||||
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
|
||||
|
||||
// ping timer
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
|
||||
cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
|
||||
cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerLogicClock);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClock);
|
||||
cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerLogicClockUser);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClockUser);
|
||||
cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB);
|
||||
cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerCounter);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerCounter);
|
||||
cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);
|
||||
|
||||
// elect timer
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
|
||||
cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
|
||||
cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerLogicClock);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClock);
|
||||
cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerLogicClockUser);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClockUser);
|
||||
cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
|
||||
cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerCounter);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerCounter);
|
||||
cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);
|
||||
|
||||
// heartbeat timer
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
|
||||
cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
|
||||
cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerLogicClock);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock);
|
||||
cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerLogicClockUser);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClockUser);
|
||||
cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB);
|
||||
cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerCounter);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerCounter);
|
||||
cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);
|
||||
|
||||
// callback
|
||||
|
@ -634,7 +634,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
|||
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pPingTimer);
|
||||
} else {
|
||||
sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu",
|
||||
sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "",
|
||||
pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
|
||||
}
|
||||
}
|
||||
|
@ -655,7 +655,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
|||
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pPingTimer);
|
||||
} else {
|
||||
sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%lu, electTimerLogicClockUser:%lu",
|
||||
sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "",
|
||||
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
|
||||
}
|
||||
}
|
||||
|
@ -676,7 +676,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
|||
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pHeartbeatTimer);
|
||||
} else {
|
||||
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu",
|
||||
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64 "",
|
||||
pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
|
||||
}
|
||||
}
|
||||
|
@ -713,4 +713,4 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
|
|||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -218,7 +218,7 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
|
|||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->logicClock);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->logicClock);
|
||||
cJSON_AddStringToObject(pRoot, "logicClock", u64buf);
|
||||
cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data);
|
||||
|
@ -239,7 +239,7 @@ char* syncTimeout2Str(const SyncTimeout* pMsg) {
|
|||
// for debug ----------------------
|
||||
void syncTimeoutPrint(const SyncTimeout* pMsg) {
|
||||
char* serialized = syncTimeout2Str(pMsg);
|
||||
printf("syncTimeoutPrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||
printf("syncTimeoutPrint | len:%zu | %s \n", strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
free(serialized);
|
||||
}
|
||||
|
@ -349,7 +349,7 @@ cJSON* syncPing2Json(const SyncPing* pMsg) {
|
|||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
|
@ -364,7 +364,7 @@ cJSON* syncPing2Json(const SyncPing* pMsg) {
|
|||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
|
@ -512,7 +512,7 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
|
|||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
|
@ -527,7 +527,7 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
|
|||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
|
@ -565,27 +565,27 @@ char* syncPingReply2Str(const SyncPingReply* pMsg) {
|
|||
// for debug ----------------------
|
||||
void syncPingReplyPrint(const SyncPingReply* pMsg) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
printf("syncPingReplyPrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||
printf("syncPingReplyPrint | len:%zu | %s \n", strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
free(serialized);
|
||||
}
|
||||
|
||||
void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
printf("syncPingReplyPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||
printf("syncPingReplyPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
free(serialized);
|
||||
}
|
||||
|
||||
void syncPingReplyLog(const SyncPingReply* pMsg) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
sTrace("syncPingReplyLog | len:%lu | %s", strlen(serialized), serialized);
|
||||
sTrace("syncPingReplyLog | len:%zu | %s", strlen(serialized), serialized);
|
||||
free(serialized);
|
||||
}
|
||||
|
||||
void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) {
|
||||
char* serialized = syncPingReply2Str(pMsg);
|
||||
sTrace("syncPingReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||
sTrace("syncPingReplyLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized);
|
||||
free(serialized);
|
||||
}
|
||||
|
||||
|
@ -670,7 +670,7 @@ cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) {
|
|||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->seqNum);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->seqNum);
|
||||
cJSON_AddStringToObject(pRoot, "seqNum", u64buf);
|
||||
cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak);
|
||||
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||
|
@ -792,7 +792,7 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
|
|||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
|
@ -820,11 +820,11 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
|
|||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->lastLogIndex);
|
||||
cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->lastLogTerm);
|
||||
cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf);
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
|
@ -936,7 +936,7 @@ cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
|
|||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
|
@ -964,7 +964,7 @@ cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
|
|||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted);
|
||||
|
||||
|
@ -1079,7 +1079,7 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
|
|||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
|
@ -1094,7 +1094,7 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
|
|||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
|
@ -1108,16 +1108,16 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
|
|||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->prevLogIndex);
|
||||
cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->prevLogTerm);
|
||||
cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->commitIndex);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->commitIndex);
|
||||
cJSON_AddStringToObject(pRoot, "commit_index", u64buf);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||
|
@ -1238,7 +1238,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
|
|||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
|
@ -1253,7 +1253,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
|
|||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
|
@ -1267,10 +1267,10 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
|
|||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
cJSON_AddNumberToObject(pRoot, "success", pMsg->success);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->matchIndex);
|
||||
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
|
|
|
@ -34,6 +34,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
|||
pLogStore->getLastTerm = logStoreLastTerm;
|
||||
pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
|
||||
pLogStore->getCommitIndex = logStoreGetCommitIndex;
|
||||
return pLogStore; // to avoid compiler error
|
||||
}
|
||||
|
||||
void logStoreDestory(SSyncLogStore* pLogStore) {
|
||||
|
@ -58,6 +59,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
|||
|
||||
walFsync(pWal, true);
|
||||
free(serialized);
|
||||
return code; // to avoid compiler error
|
||||
}
|
||||
|
||||
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||
|
@ -79,6 +81,7 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
|
|||
SSyncLogStoreData* pData = pLogStore->data;
|
||||
SWal* pWal = pData->pWal;
|
||||
walRollback(pWal, fromIndex);
|
||||
return 0; // to avoid compiler error
|
||||
}
|
||||
|
||||
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
|
||||
|
@ -102,6 +105,7 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
|||
SSyncLogStoreData* pData = pLogStore->data;
|
||||
SWal* pWal = pData->pWal;
|
||||
walCommit(pWal, index);
|
||||
return 0; // to avoid compiler error
|
||||
}
|
||||
|
||||
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
|
||||
|
@ -130,9 +134,9 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
|
|||
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
|
||||
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", logStoreLastIndex(pLogStore));
|
||||
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", logStoreLastTerm(pLogStore));
|
||||
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
|
||||
|
||||
cJSON* pEntries = cJSON_CreateArray();
|
||||
|
@ -181,4 +185,4 @@ void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
|
|||
char* serialized = logStore2Str(pLogStore);
|
||||
sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||
free(serialized);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
|||
syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg);
|
||||
|
||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||
sTrace("DropStaleResponse, receive term:%lu, current term:%lu", pMsg->term, ths->pRaftStore->currentTerm);
|
||||
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -119,7 +119,7 @@ cJSON* syncUtilRaftId2Json(const SRaftId* p) {
|
|||
char u64buf[128];
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", p->addr);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", p->addr);
|
||||
cJSON_AddStringToObject(pRoot, "addr", u64buf);
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
|
@ -196,4 +196,4 @@ SyncIndex syncUtilMinIndex(SyncIndex a, SyncIndex b) {
|
|||
SyncIndex syncUtilMaxIndex(SyncIndex a, SyncIndex b) {
|
||||
SyncIndex r = a > b ? a : b;
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue