diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 69c2618ac8..99360b8d3f 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -56,6 +56,11 @@ typedef enum { TSDB_STATIS_NONE = 1, // statis part not exist } ETsdbStatisStatus; +typedef enum { + TSDB_SMA_STAT_OK = 0, // ready to provide service + TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired +} ETsdbSmaStat; + extern char *qtypeStr[]; #ifdef __cplusplus diff --git a/include/os/osDir.h b/include/os/osDir.h index 223c603352..6cf28fb878 100644 --- a/include/os/osDir.h +++ b/include/os/osDir.h @@ -16,10 +16,24 @@ #ifndef _TD_OS_DIR_H_ #define _TD_OS_DIR_H_ +// If the error is in a third-party library, place this header file under the third-party library header file. +#ifndef ALLOW_FORBID_FUNC + #define opendir OPENDIR_FUNC_TAOS_FORBID + #define readdir READDIR_FUNC_TAOS_FORBID + #define closedir CLOSEDIR_FUNC_TAOS_FORBID + #define dirname DIRNAME_FUNC_TAOS_FORBID + #undef basename + #define basename BASENAME_FUNC_TAOS_FORBID +#endif + #ifdef __cplusplus extern "C" { #endif +typedef struct TdDir *TdDirPtr; +typedef struct TdDirEntry *TdDirEntryPtr; + + void taosRemoveDir(const char *dirname); bool taosDirExist(char *dirname); int32_t taosMkDir(const char *dirname); @@ -27,6 +41,14 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays); int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen); int32_t taosRealPath(char *dirname, int32_t maxlen); bool taosIsDir(const char *dirname); +char* taosDirName(char *dirname); +char* taosDirEntryBaseName(char *dirname); + +TdDirPtr taosOpenDir(const char *dirname); +TdDirEntryPtr taosReadDir(TdDirPtr pDir); +bool taosDirEntryIsDir(TdDirEntryPtr pDirEntry); +char* taosGetDirEntryName(TdDirEntryPtr pDirEntry); +int32_t taosCloseDir(TdDirPtr pDir); #ifdef __cplusplus } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 319bd9fde6..b6dd90a4e2 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -59,7 +59,7 @@ typedef struct { SWalCfg walCfg; uint32_t hashBegin; uint32_t hashEnd; - int8_t hashMethod; + int8_t hashMethod; } SVnodeCfg; typedef struct { @@ -202,6 +202,22 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); /* ------------------------- TQ READ --------------------------- */ +enum { + TQ_STREAM_TOKEN__DATA = 1, + TQ_STREAM_TOKEN__WATERMARK, + TQ_STREAM_TOKEN__CHECKPOINT, +}; + +typedef struct { + int8_t type; + int8_t reserved[7]; + union { + void *data; + int64_t wmTs; + int64_t checkpointId; + }; +} STqStreamToken; + STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) { diff --git a/source/dnode/vnode/src/inc/tqPush.h b/source/dnode/vnode/src/inc/tqPush.h index 32fd7c3ddf..a6121c5dc1 100644 --- a/source/dnode/vnode/src/inc/tqPush.h +++ b/source/dnode/vnode/src/inc/tqPush.h @@ -16,9 +16,11 @@ #ifndef _TQ_PUSH_H_ #define _TQ_PUSH_H_ +#include "executor.h" #include "thash.h" #include "trpc.h" #include "ttimer.h" +#include "vnode.h" #ifdef __cplusplus extern "C" { @@ -39,11 +41,12 @@ typedef struct { } STqClientPusher; typedef struct { - int8_t type; - int8_t nodeType; - int8_t reserved[6]; - int64_t streamId; - SEpSet epSet; + int8_t type; + int8_t nodeType; + int8_t reserved[6]; + int64_t streamId; + qTaskInfo_t task; + // TODO sync function } STqStreamPusher; typedef struct { diff --git a/source/dnode/vnode/src/inc/tsdbDef.h b/source/dnode/vnode/src/inc/tsdbDef.h index 96a76ea7d4..1451ac9685 100644 --- a/source/dnode/vnode/src/inc/tsdbDef.h +++ b/source/dnode/vnode/src/inc/tsdbDef.h @@ -52,6 +52,7 @@ struct STsdb { STsdbFS * fs; SMeta * pMeta; STfs * pTfs; + SSmaStat * pSmaStat; }; #define REPO_ID(r) ((r)->vgId) diff --git a/source/dnode/vnode/src/inc/tsdbFS.h b/source/dnode/vnode/src/inc/tsdbFS.h index 641255a294..173e991631 100644 --- a/source/dnode/vnode/src/inc/tsdbFS.h +++ b/source/dnode/vnode/src/inc/tsdbFS.h @@ -42,7 +42,10 @@ typedef struct { typedef struct { STsdbFSMeta meta; // FS meta SArray * df; // data file array - SArray * smaf; // sma data file array + + // SArray * v2f100.tsma.index_name + + SArray * smaf; // sma data file array v2f1900.tsma.index_name } SFSStatus; typedef struct { diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index e4de7a6685..6e4ad909ae 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -16,6 +16,8 @@ #ifndef _TD_TSDB_SMA_H_ #define _TD_TSDB_SMA_H_ +typedef struct SSmaStat SSmaStat; + // insert/update interface int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData); int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData); @@ -26,13 +28,14 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData); int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult); // management interface -int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void* result); +int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg); +int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); - - - +int32_t tsdbFreeSmaState(SSmaStat *pSmaStat); // internal func + + static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { int32_t len = 0; len += taosEncodeFixedU64(pData, tableUid); diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index f49515412b..efdb3e0fe4 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -923,6 +923,7 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) { SMetaDB *pDB = pMeta->pDB; DBC * pCur = NULL; DBT pkey = {0}, pval = {0}; + uint32_t mode = isDup ? DB_NEXT_DUP : DB_NEXT_NODUP; int ret; pUids = taosArrayInit(16, sizeof(tb_uid_t)); @@ -941,13 +942,8 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) { void *pBuf = NULL; // TODO: lock? - while (true) { - ret = pCur->get(pCur, &pkey, &pval, isDup ? DB_NEXT_DUP : DB_NEXT_NODUP); - if(ret == 0) { + while ((ret = pCur->get(pCur, &pkey, &pval, mode)) == 0) { taosArrayPush(pUids, pkey.data); - continue; - } - break; } if (pCur) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 16809f1527..d15481b4aa 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -67,6 +67,26 @@ void tqClose(STQ* pTq) { } int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { + if (msgType != TDMT_VND_SUBMIT) return 0; + void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL); + while (pIter != NULL) { + STqPusher* pusher = *(STqPusher**)pIter; + if (pusher->type == TQ_PUSHER_TYPE__STREAM) { + STqStreamPusher* streamPusher = (STqStreamPusher*)pusher; + // repack + STqStreamToken* token = malloc(sizeof(STqStreamToken)); + if (token == NULL) { + taosHashCancelIterate(pTq->tqPushMgr->pHash, pIter); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + token->type = TQ_STREAM_TOKEN__DATA; + token->data = msg; + // set input + // exec + } + // send msg to ep + } // iterate hash // process all msg // if waiting diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index fea65846be..4186f29e2a 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -73,7 +73,7 @@ STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet streamPusher->type = TQ_PUSHER_TYPE__STREAM; streamPusher->nodeType = 0; streamPusher->streamId = streamId; - memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet)); + /*memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet));*/ if (taosHashPut(pushMgr->pHash, &streamId, sizeof(int64_t), &streamPusher, sizeof(void*)) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2f24df0309..92a111298f 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -12,7 +12,6 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#define _DEFAULT_SOURCE #include "vnode.h" @@ -37,6 +36,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t pMsg->length = htonl(pMsg->length); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + // iterate and convert if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; while (true) { if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; diff --git a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c new file mode 100644 index 0000000000..f2f48bbc8a --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 74fb8c1c1f..00e97c7b61 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -365,7 +365,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T if (errno == ENOENT) { // Try to create directory recursively char *s = strdup(TSDB_FILE_REL_NAME(pDFile)); - if (tfsMkdirRecurAt(pRepo->pTfs, dirname(s), TSDB_FILE_DID(pDFile)) < 0) { + if (tfsMkdirRecurAt(pRepo->pTfs, taosDirName(s), TSDB_FILE_DID(pDFile)) < 0) { tfree(s); return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMain.c b/source/dnode/vnode/src/tsdb/tsdbMain.c index 2d8c470113..1b3e00f090 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMain.c +++ b/source/dnode/vnode/src/tsdb/tsdbMain.c @@ -89,6 +89,7 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, static void tsdbFree(STsdb *pTsdb) { if (pTsdb) { tsdbFreeFS(pTsdb->fs); + tsdbFreeSmaState(pTsdb->pSmaStat); tfree(pTsdb->path); free(pTsdb); } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index b465dc3a88..c5f1261282 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -21,6 +21,10 @@ #define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks +#define SMA_STATE_HASH_SLOT 4 +#define SMA_STATE_ITEM_HASH_SLOT 32 + +#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test typedef enum { SMA_STORAGE_LEVEL_TSDB = 0, // store TSma in dir e.g. vnode${N}/tsdb/.tsma SMA_STORAGE_LEVEL_DFILESET = 1 // store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name} @@ -48,6 +52,22 @@ typedef struct { // TODO } STSmaReadH; +typedef struct { + /** + * @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service. + * - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open, + * without information about its previous state. + * - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from + * Streaming Module or TSDB local persistence. + */ + int8_t state; // ETsdbSmaStat + SHashObj *expiredWindows; // key: skey of time window, value: N/A +} SSmaStatItem; + +struct SSmaStat { + SHashObj *smaStatItems; // key: indexName, value: SSmaStatItem +}; + // declaration of static functions static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); @@ -64,6 +84,125 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); +static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { + ASSERT(pSmaStat != NULL); + + if (*pSmaStat != NULL) { // no lock + return TSDB_CODE_SUCCESS; + } + + // TODO: lock. lazy mode when update expired window, or hungry mode during tsdbNew. + if (*pSmaStat == NULL) { + *pSmaStat = (SSmaStat *)calloc(1, sizeof(SSmaStat)); + if (*pSmaStat == NULL) { + // TODO: unlock + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + + (*pSmaStat)->smaStatItems = + taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + + if ((*pSmaStat)->smaStatItems == NULL) { + tfree(*pSmaStat); + // TODO: unlock + return TSDB_CODE_FAILED; + } + } + // TODO: unlock + return TSDB_CODE_SUCCESS; +} + +static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) { + SSmaStatItem *pItem = NULL; + + pItem = (SSmaStatItem *)calloc(1, sizeof(SSmaStatItem)); + if (pItem) { + pItem->state = state; + pItem->expiredWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP), + true, HASH_ENTRY_LOCK); + if (!pItem->expiredWindows) { + tfree(pItem); + } + } + return pItem; +} + +int32_t tsdbFreeSmaState(SSmaStat *pSmaStat) { + if (pSmaStat) { + // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. + SSmaStatItem *item = taosHashIterate(pSmaStat->smaStatItems, NULL); + while (item != NULL) { + taosHashCleanup(item->expiredWindows); + item = taosHashIterate(pSmaStat->smaStatItems, item); + } + + taosHashCleanup(pSmaStat->smaStatItems); + free(pSmaStat); + } +} + +/** + * @brief Update expired window according to msg from stream computing module. + * + * @param pTsdb + * @param msg + * @return int32_t + */ +int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { + if (msg == NULL) { + return TSDB_CODE_FAILED; + } + + tsdbInitSmaStat(&pTsdb->pSmaStat); // lazy mode + + // TODO: decode the msg => start + const char * indexName = SMA_TEST_INDEX_NAME; + const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; + TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; + int64_t now = taosGetTimestampMs(); + for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { + expiredWindows[i] = now + i; + } + + // TODO: decode the msg <= end + SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems; + + SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName)); + if (!pItem) { + pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state + if (!pItem) { + // Response to stream computing: OOM + // For query, if the indexName not found, the TSDB should tell query module to query raw TS data. + return TSDB_CODE_FAILED; + } + + if (taosHashPut(pItemsHash, indexName, strnlen(indexName, TSDB_INDEX_NAME_LEN), &pItem, sizeof(pItem)) != 0) { + // If error occurs during put smaStatItem, free the resources of pItem + taosHashCleanup(pItem->expiredWindows); + free(pItem); + return TSDB_CODE_FAILED; + } + } + + int8_t state = TSDB_SMA_STAT_EXPIRED; + for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { + if (taosHashPut(pItem->expiredWindows, &expiredWindows[i], sizeof(TSKEY), &state, sizeof(state)) != 0) { + // If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would + // tell query module to query raw TS data. + // N.B. + // 1) It is assumed to be extemely little probability event of fail to taosHashPut. + // 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired + // windows failed to put into hash table. + taosHashCleanup(pItem->expiredWindows); + taosHashRemove(pItemsHash, indexName, sizeof(indexName)); + return TSDB_CODE_FAILED; + } + } + + return TSDB_CODE_SUCCESS; +} + /** * @brief Judge the tSma storage level * @@ -484,6 +623,22 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow * @return int32_t */ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult) { + const char *indexName = param->indexName; + + SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, indexName, strlen(indexName)); + if (pItem == NULL) { + // mark all window as expired and notify query module to query raw TS data. + return TSDB_CODE_SUCCESS; + } + + int32_t nQueryWin = 0; + for (int32_t n = 0; n < nQueryWin; ++n) { + TSKEY thisWindow = n; + if (taosHashGet(pItem->expiredWindows, &thisWindow, sizeof(thisWindow)) != NULL) { + // TODO: mark this window as expired. + } + } + STSmaReadH tReadH = {0}; tsdbInitTSmaReadH(&tReadH, pTsdb, param, pData); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index fd267fbf03..89f3f8ba8a 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -722,13 +722,13 @@ static SArray* tfileGetFileList(const char* path) { uint32_t version; SArray* files = taosArrayInit(4, sizeof(void*)); - DIR* dir = opendir(path); - if (NULL == dir) { + TdDirPtr pDir = taosOpenDir(path); + if (NULL == pDir) { return NULL; } - struct dirent* entry; - while ((entry = readdir(dir)) != NULL) { - char* file = entry->d_name; + TdDirEntryPtr pDirEntry; + while ((pDirEntry = taosReadDir(pDir)) != NULL) { + char* file = taosGetDirEntryName(pDirEntry); if (0 != tfileParseFileName(file, &suid, buf, &version)) { continue; } @@ -738,7 +738,7 @@ static SArray* tfileGetFileList(const char* path) { sprintf(buf, "%s/%s", path, file); taosArrayPush(files, &buf); } - closedir(dir); + taosCloseDir(pDir); taosArraySort(files, tfileCompare); tfileRmExpireFile(files); diff --git a/source/libs/tfs/inc/tfsInt.h b/source/libs/tfs/inc/tfsInt.h index 913f34d6c2..f16d0445c6 100644 --- a/source/libs/tfs/inc/tfsInt.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -59,7 +59,7 @@ typedef struct STfsDir { SDiskID did; char dirname[TSDB_FILENAME_LEN]; STfsFile tfile; - DIR *dir; + TdDirPtr pDir; STfs *pTfs; } STfsDir; diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 2579490791..c46989dc5d 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -192,14 +192,14 @@ void tfsBasename(const STfsFile *pFile, char *dest) { char tname[TSDB_FILENAME_LEN] = "\0"; tstrncpy(tname, pFile->aname, TSDB_FILENAME_LEN); - tstrncpy(dest, basename(tname), TSDB_FILENAME_LEN); + tstrncpy(dest, taosDirEntryBaseName(tname), TSDB_FILENAME_LEN); } void tfsDirname(const STfsFile *pFile, char *dest) { char tname[TSDB_FILENAME_LEN] = "\0"; tstrncpy(tname, pFile->aname, TSDB_FILENAME_LEN); - tstrncpy(dest, dirname(tname), TSDB_FILENAME_LEN); + tstrncpy(dest, taosDirName(tname), TSDB_FILENAME_LEN); } int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); } @@ -233,7 +233,7 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId) { // the pointer directly in this recursion. // See // https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man3/dirname.3.html - char *dir = strdup(dirname(s)); + char *dir = strdup(taosDirName(s)); if (tfsMkdirRecurAt(pTfs, dir, diskId) < 0) { free(s); @@ -324,45 +324,46 @@ STfsDir *tfsOpendir(STfs *pTfs, const char *rname) { return pDir; } -const STfsFile *tfsReaddir(STfsDir *pDir) { - if (pDir == NULL || pDir->dir == NULL) return NULL; +const STfsFile *tfsReaddir(STfsDir *pTfsDir) { + if (pTfsDir == NULL || pTfsDir->pDir == NULL) return NULL; char bname[TMPNAME_LEN * 2] = "\0"; while (true) { - struct dirent *dp = NULL; - dp = readdir(pDir->dir); - if (dp != NULL) { + TdDirEntryPtr pDirEntry = NULL; + pDirEntry = taosReadDir(pTfsDir->pDir); + if (pDirEntry != NULL) { // Skip . and .. - if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue; + char *name = taosGetDirEntryName(pDirEntry); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; - if (pDir->dirname == NULL || pDir->dirname[0] == 0) { - snprintf(bname, TMPNAME_LEN * 2, "%s", dp->d_name); + if (pTfsDir->dirname == NULL || pTfsDir->dirname[0] == 0) { + snprintf(bname, TMPNAME_LEN * 2, "%s", name); } else { - snprintf(bname, TMPNAME_LEN * 2, "%s%s%s", pDir->dirname, TD_DIRSEP, dp->d_name); + snprintf(bname, TMPNAME_LEN * 2, "%s%s%s", pTfsDir->dirname, TD_DIRSEP, name); } - tfsInitFile(pDir->pTfs, &pDir->tfile, pDir->did, bname); - return &pDir->tfile; + tfsInitFile(pTfsDir->pTfs, &pTfsDir->tfile, pTfsDir->did, bname); + return &pTfsDir->tfile; } - if (tfsOpendirImpl(pDir->pTfs, pDir) < 0) { + if (tfsOpendirImpl(pTfsDir->pTfs, pTfsDir) < 0) { return NULL; } - if (pDir->dir == NULL) { + if (pTfsDir->pDir == NULL) { terrno = TSDB_CODE_SUCCESS; return NULL; } } } -void tfsClosedir(STfsDir *pDir) { - if (pDir) { - if (pDir->dir != NULL) { - closedir(pDir->dir); - pDir->dir = NULL; +void tfsClosedir(STfsDir *pTfsDir) { + if (pTfsDir) { + if (pTfsDir->pDir != NULL) { + taosCloseDir(pTfsDir->pDir); + pTfsDir->pDir = NULL; } - free(pDir); + free(pTfsDir); } } @@ -487,29 +488,29 @@ static STfsDisk *tfsGetDiskByName(STfs *pTfs, const char *dir) { return pDisk; } -static int32_t tfsOpendirImpl(STfs *pTfs, STfsDir *pDir) { +static int32_t tfsOpendirImpl(STfs *pTfs, STfsDir *pTfsDir) { STfsDisk *pDisk = NULL; char adir[TMPNAME_LEN * 2] = "\0"; - if (pDir->dir != NULL) { - closedir(pDir->dir); - pDir->dir = NULL; + if (pTfsDir->pDir != NULL) { + taosCloseDir(pTfsDir->pDir); + pTfsDir->pDir = NULL; } while (true) { - pDisk = tfsNextDisk(pTfs, &pDir->iter); + pDisk = tfsNextDisk(pTfs, &pTfsDir->iter); if (pDisk == NULL) return 0; - pDir->did.level = pDisk->level; - pDir->did.id = pDisk->id; + pTfsDir->did.level = pDisk->level; + pTfsDir->did.id = pDisk->id; if (pDisk->path == NULL || pDisk->path[0] == 0) { - snprintf(adir, TMPNAME_LEN * 2, "%s", pDir->dirname); + snprintf(adir, TMPNAME_LEN * 2, "%s", pTfsDir->dirname); } else { - snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TD_DIRSEP, pDir->dirname); + snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TD_DIRSEP, pTfsDir->dirname); } - pDir->dir = opendir(adir); - if (pDir->dir != NULL) break; + pTfsDir->pDir = taosOpenDir(adir); + if (pTfsDir->pDir != NULL) break; } return 0; diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 2a4f3497e4..248f758787 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -130,16 +130,16 @@ int walCheckAndRepairMeta(SWal* pWal) { regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); - DIR* dir = opendir(pWal->path); - if (dir == NULL) { + TdDirPtr pDir = taosOpenDir(pWal->path); + if (pDir == NULL) { wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); return -1; } // scan log files and build new meta - struct dirent* ent; - while ((ent = readdir(dir)) != NULL) { - char* name = basename(ent->d_name); + TdDirEntryPtr pDirEntry; + while ((pDirEntry = taosReadDir(pDir)) != NULL) { + char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry)); int code = regexec(&logRegPattern, name, 0, NULL, 0); if (code == 0) { SWalFileInfo fileInfo; @@ -149,7 +149,7 @@ int walCheckAndRepairMeta(SWal* pWal) { } } - closedir(dir); + taosCloseDir(pDir); regfree(&logRegPattern); regfree(&idxRegPattern); @@ -337,25 +337,25 @@ static int walFindCurMetaVer(SWal* pWal) { regex_t walMetaRegexPattern; regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED); - DIR* dir = opendir(pWal->path); - if (dir == NULL) { + TdDirPtr pDir = taosOpenDir(pWal->path); + if (pDir == NULL) { wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); return -1; } - struct dirent* ent; + TdDirEntryPtr pDirEntry; // find existing meta-ver[x].json int metaVer = -1; - while ((ent = readdir(dir)) != NULL) { - char* name = basename(ent->d_name); + while ((pDirEntry = taosReadDir(pDir)) != NULL) { + char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry)); int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0); if (code == 0) { sscanf(name, "meta-ver%d", &metaVer); break; } } - closedir(dir); + taosCloseDir(pDir); regfree(&walMetaRegexPattern); return metaVer; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 699da75790..0f6ac0b214 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -34,9 +34,6 @@ int32_t walCommit(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) { int code; char fnameStr[WAL_FILE_LEN]; - if (ver == pWal->vers.lastVer) { - return 0; - } if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { terrno = TSDB_CODE_WAL_INVALID_VER; return -1; diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 230555e016..5bfea9ab5e 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -124,13 +124,8 @@ class WalRetentionEnv : public ::testing::Test { void SetUp() override { SWalCfg cfg; - cfg.rollPeriod = -1, - cfg.segSize = -1, - cfg.retentionPeriod = -1, - cfg.retentionSize = 0, - cfg.rollPeriod = 0, - cfg.vgId = 0, - cfg.level = TAOS_WAL_FSYNC; + cfg.rollPeriod = -1, cfg.segSize = -1, cfg.retentionPeriod = -1, cfg.retentionSize = 0, cfg.rollPeriod = 0, + cfg.vgId = 0, cfg.level = TAOS_WAL_FSYNC; pWal = walOpen(pathName, &cfg); ASSERT(pWal != NULL); } @@ -241,6 +236,12 @@ TEST_F(WalCleanEnv, rollback) { ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); } + code = walRollback(pWal, 12); + ASSERT_NE(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 9); + code = walRollback(pWal, 9); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 8); code = walRollback(pWal, 5); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, 4); @@ -324,7 +325,7 @@ TEST_F(WalKeepEnv, readHandleRead) { TEST_F(WalRetentionEnv, repairMeta1) { walResetEnv(); int code; - + int i; for (i = 0; i < 100; i++) { char newStr[100]; @@ -336,14 +337,14 @@ TEST_F(WalRetentionEnv, repairMeta1) { TearDown(); - //getchar(); + // getchar(); char buf[100]; sprintf(buf, "%s/meta-ver%d", pathName, 0); taosRemoveFile(buf); sprintf(buf, "%s/meta-ver%d", pathName, 1); taosRemoveFile(buf); SetUp(); - //getchar(); + // getchar(); ASSERT_EQ(pWal->vers.lastVer, 99); @@ -401,5 +402,4 @@ TEST_F(WalRetentionEnv, repairMeta1) { EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); } } - } diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 7d7382d83f..b4058b3c0e 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -14,6 +14,7 @@ */ #define _DEFAULT_SOURCE +#define ALLOW_FORBID_FUNC #include "os.h" #include "osString.h" @@ -36,6 +37,10 @@ #include #include +typedef struct dirent dirent; +typedef struct DIR TdDir; +typedef struct dirent TdDirent; + void taosRemoveDir(const char *dirname) { DIR *dir = opendir(dirname); if (dir == NULL) return; @@ -149,4 +154,47 @@ bool taosIsDir(const char *dirname) { return false; } +char* taosDirName(char *name) { + return dirname(name); +} + +char* taosDirEntryBaseName(char *name) { + return basename(name); +} + +TdDirPtr taosOpenDir(const char *dirname) { + if (dirname == NULL) { + return NULL; + } + return (TdDirPtr)opendir(dirname); +} + +TdDirEntryPtr taosReadDir(TdDirPtr pDir) { + if (pDir == NULL) { + return NULL; + } + return (TdDirEntryPtr)readdir((DIR*)pDir); +} + +bool taosDirEntryIsDir(TdDirEntryPtr pDirEntry) { + if (pDirEntry == NULL) { + return false; + } + return (((dirent*)pDirEntry)->d_type & DT_DIR) != 0; +} + +char* taosGetDirEntryName(TdDirEntryPtr pDirEntry) { + if (pDirEntry == NULL) { + return NULL; + } + return ((dirent*)pDirEntry)->d_name; +} + +int32_t taosCloseDir(TdDirPtr pDir) { + if (pDir == NULL) { + return -1; + } + return closedir((DIR*)pDir); +} + #endif diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index 0d7066b5c8..e5b506e8d8 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -99,7 +99,7 @@ static void *sem_thread_routine(void *arg) { sem_port = mach_task_self(); kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0); if (ret != KERN_SUCCESS) { - fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", basename(__FILE__), __LINE__, __func__); + fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__); sem_inited = -1; return NULL; } @@ -112,7 +112,7 @@ static void once_init(void) { int r = 0; r = pthread_create(&sem_thread, NULL, sem_thread_routine, NULL); if (r) { - fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", basename(__FILE__), __LINE__, __func__); + fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__); return; } while (sem_inited == 0) { @@ -139,14 +139,14 @@ struct tsem_s { }; int tsem_init(tsem_t *sem, int pshared, unsigned int value) { - // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", basename(__FILE__), __LINE__, __func__, sem); + // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); if (*sem) { - fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } struct tsem_s *p = (struct tsem_s *)calloc(1, sizeof(*p)); if (!p) { - fprintf(stderr, "==%s[%d]%s():[%p]==out of memory\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==out of memory\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } @@ -162,7 +162,7 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) { p->val = value; } while (0); if (r) { - fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } #elif defined(SEM_USE_POSIX) @@ -181,27 +181,27 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) { int e = errno; if (e == EEXIST) continue; if (e == EINTR) continue; - fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", basename(__FILE__), __LINE__, __func__, sem, e, + fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, e, strerror(e)); abort(); } while (p->sem == SEM_FAILED); #elif defined(SEM_USE_SEM) pthread_once(&sem_once, once_init); if (sem_inited != 1) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); errno = ENOMEM; return -1; } kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value); if (ret != KERN_SUCCESS) { - fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // we fail-fast here, because we have less-doc about semaphore_create for the moment abort(); } #else // SEM_USE_PTHREAD p->sem = dispatch_semaphore_create(value); if (p->sem == NULL) { - fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } #endif // SEM_USE_PTHREAD @@ -215,28 +215,28 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) { int tsem_wait(tsem_t *sem) { if (!*sem) { - fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } struct tsem_s *p = *sem; if (!p->valid) { - fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } #ifdef SEM_USE_PTHREAD if (pthread_mutex_lock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } p->val -= 1; if (p->val < 0) { if (pthread_cond_wait(&p->cond, &p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } } if (pthread_mutex_unlock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } return 0; @@ -251,28 +251,28 @@ int tsem_wait(tsem_t *sem) { int tsem_post(tsem_t *sem) { if (!*sem) { - fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } struct tsem_s *p = *sem; if (!p->valid) { - fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } #ifdef SEM_USE_PTHREAD if (pthread_mutex_lock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } p->val += 1; if (p->val <= 0) { if (pthread_cond_signal(&p->cond)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } } if (pthread_mutex_unlock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } return 0; @@ -286,34 +286,34 @@ int tsem_post(tsem_t *sem) { } int tsem_destroy(tsem_t *sem) { - // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", basename(__FILE__), __LINE__, __func__, sem); + // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); if (!*sem) { - // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem); + // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // abort(); return 0; } struct tsem_s *p = *sem; if (!p->valid) { - // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem); + // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // abort(); return 0; } #ifdef SEM_USE_PTHREAD if (pthread_mutex_lock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } p->valid = 0; if (pthread_cond_destroy(&p->cond)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } if (pthread_mutex_unlock(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } if (pthread_mutex_destroy(&p->lock)) { - fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem); + fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); abort(); } #elif defined(SEM_USE_POSIX) @@ -322,7 +322,7 @@ int tsem_destroy(tsem_t *sem) { int r = sem_unlink(name); if (r) { int e = errno; - fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", basename(__FILE__), __LINE__, __func__, sem, e, + fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, e, strerror(e)); abort(); } diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 9330cb7b32..698ceded16 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -809,7 +809,8 @@ int32_t taosGetFqdn(char *fqdn) { char hostname[1024]; hostname[1023] = '\0'; if (gethostname(hostname, 1023) == -1) { - // printf("failed to get hostname, reason:%s", strerror(errno)); + printf("failed to get hostname, reason:%s", strerror(errno)); + assert(0); return -1; } @@ -826,7 +827,8 @@ int32_t taosGetFqdn(char *fqdn) { #endif // __APPLE__ int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); if (!result) { - // printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); + printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); + assert(0); return -1; } diff --git a/source/os/src/osTimer.c b/source/os/src/osTimer.c index bb526e0ba0..6b60923189 100644 --- a/source/os/src/osTimer.c +++ b/source/os/src/osTimer.c @@ -79,7 +79,7 @@ static void* timer_routine(void* arg) { struct kevent64_s kev[10] = {0}; r = kevent64(timer_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]), 0, &to); if (r != 0) { - fprintf(stderr, "==%s[%d]%s()==kevent64 failed\n", basename(__FILE__), __LINE__, __func__); + fprintf(stderr, "==%s[%d]%s()==kevent64 failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__); abort(); } timer_callback(SIGALRM); // just mock @@ -97,14 +97,14 @@ int taosInitTimer(void (*callback)(int), int ms) { timer_kq = kqueue(); if (timer_kq == -1) { - fprintf(stderr, "==%s[%d]%s()==failed to create timer kq\n", basename(__FILE__), __LINE__, __func__); + fprintf(stderr, "==%s[%d]%s()==failed to create timer kq\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__); // since no caller of this func checks the return value for the moment abort(); } r = pthread_create(&timer_thread, NULL, timer_routine, NULL); if (r) { - fprintf(stderr, "==%s[%d]%s()==failed to create timer thread\n", basename(__FILE__), __LINE__, __func__); + fprintf(stderr, "==%s[%d]%s()==failed to create timer thread\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__); // since no caller of this func checks the return value for the moment abort(); } @@ -116,7 +116,7 @@ void taosUninitTimer() { timer_stop = 1; r = pthread_join(timer_thread, NULL); if (r) { - fprintf(stderr, "==%s[%d]%s()==failed to join timer thread\n", basename(__FILE__), __LINE__, __func__); + fprintf(stderr, "==%s[%d]%s()==failed to join timer thread\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__); // since no caller of this func checks the return value for the moment abort(); } diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 3eb8e60d56..cb7d7c67ce 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -192,11 +192,11 @@ static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } // calc dir size (not include itself 4096Byte) int64_t getDirectorySize(char *dir) { - DIR *dp; - struct dirent *entry; + TdDirPtr pDir; + TdDirEntryPtr pDirEntry; int64_t totalSize=0; - if ((dp = opendir(dir)) == NULL) { + if ((pDir = taosOpenDir(dir)) == NULL) { fprintf(stderr, "Cannot open dir: %s\n", dir); return -1; } @@ -204,26 +204,27 @@ int64_t getDirectorySize(char *dir) //lstat(dir, &statbuf); //totalSize+=statbuf.st_size; - while ((entry = readdir(dp)) != NULL) { + while ((pDirEntry = taosReadDir(pDir)) != NULL) { char subdir[1024]; - sprintf(subdir, "%s/%s", dir, entry->d_name); + char* fileName = taosGetDirEntryName(pDirEntry); + sprintf(subdir, "%s/%s", dir, fileName); //printf("===d_name: %s\n", entry->d_name); if (taosIsDir(subdir)) { - if (strcmp(".", entry->d_name) == 0 || strcmp("..", entry->d_name) == 0) { + if (strcmp(".", fileName) == 0 || strcmp("..", fileName) == 0) { continue; } int64_t subDirSize = getDirectorySize(subdir); totalSize+=subDirSize; - } else if (0 == strcmp(strchr(entry->d_name, '.'), ".log")) { // only calc .log file size, and not include .idx file + } else if (0 == strcmp(strchr(fileName, '.'), ".log")) { // only calc .log file size, and not include .idx file int64_t file_size = 0; taosStatFile(subdir, &file_size, NULL); totalSize+=file_size; } } - closedir(dp); + taosCloseDir(pDir); return totalSize; }