From 64363627fe2a406fad94b26e73cf099f10b4ea38 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 16 Jul 2024 17:30:32 +0800 Subject: [PATCH 1/5] enh: refact tqueue code --- include/util/tqueue.h | 56 +-- source/client/src/clientMonitor.c | 350 +++++++++--------- source/client/src/clientTmq.c | 102 +++-- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 12 +- source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 7 +- source/dnode/mgmt/mgmt_snode/src/smWorker.c | 7 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 8 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 4 +- source/dnode/vnode/src/sma/smaRollup.c | 75 ++-- source/dnode/vnode/src/tq/tqUtil.c | 66 ++-- source/libs/executor/src/dataDeleter.c | 17 +- source/libs/executor/src/dataDispatcher.c | 20 +- source/libs/qcom/src/queryUtil.c | 4 +- source/libs/stream/src/streamCheckpoint.c | 10 +- source/libs/stream/src/streamData.c | 44 ++- source/libs/stream/src/streamQueue.c | 66 ++-- source/libs/stream/src/streamSched.c | 9 +- source/libs/stream/src/streamTask.c | 11 +- source/util/src/tqueue.c | 81 ++-- source/util/src/tworker.c | 156 ++++---- 20 files changed, 627 insertions(+), 478 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index bed218ac1b..f7eaf794b0 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -73,36 +73,36 @@ struct STaosQnode { char item[]; }; -STaosQueue *taosOpenQueue(); -void taosCloseQueue(STaosQueue *queue); -void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); -void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize); -void taosFreeQitem(void *pItem); -int32_t taosWriteQitem(STaosQueue *queue, void *pItem); -int32_t taosReadQitem(STaosQueue *queue, void **ppItem); -bool taosQueueEmpty(STaosQueue *queue); -void taosUpdateItemSize(STaosQueue *queue, int32_t items); -int32_t taosQueueItemSize(STaosQueue *queue); -int64_t taosQueueMemorySize(STaosQueue *queue); -void taosSetQueueCapacity(STaosQueue *queue, int64_t size); -void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t mem); +int32_t taosOpenQueue(STaosQueue **queue); +void taosCloseQueue(STaosQueue *queue); +void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); +int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item); +void taosFreeQitem(void *pItem); +int32_t taosWriteQitem(STaosQueue *queue, void *pItem); +int32_t taosReadQitem(STaosQueue *queue, void **ppItem); +bool taosQueueEmpty(STaosQueue *queue); +void taosUpdateItemSize(STaosQueue *queue, int32_t items); +int32_t taosQueueItemSize(STaosQueue *queue); +int64_t taosQueueMemorySize(STaosQueue *queue); +void taosSetQueueCapacity(STaosQueue *queue, int64_t size); +void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t mem); -STaosQall *taosAllocateQall(); -void taosFreeQall(STaosQall *qall); -int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall); -int32_t taosGetQitem(STaosQall *qall, void **ppItem); -void taosResetQitems(STaosQall *qall); -int32_t taosQallItemSize(STaosQall *qall); -int64_t taosQallMemSize(STaosQall *qll); -int64_t taosQallUnAccessedItemSize(STaosQall *qall); -int64_t taosQallUnAccessedMemSize(STaosQall *qall); +int32_t taosAllocateQall(STaosQall **qall); +void taosFreeQall(STaosQall *qall); +int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall); +int32_t taosGetQitem(STaosQall *qall, void **ppItem); +void taosResetQitems(STaosQall *qall); +int32_t taosQallItemSize(STaosQall *qall); +int64_t taosQallMemSize(STaosQall *qll); +int64_t taosQallUnAccessedItemSize(STaosQall *qall); +int64_t taosQallUnAccessedMemSize(STaosQall *qall); -STaosQset *taosOpenQset(); -void taosCloseQset(STaosQset *qset); -void taosQsetThreadResume(STaosQset *qset); -int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); -void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); -int32_t taosGetQueueNumber(STaosQset *qset); +int32_t taosOpenQset(STaosQset **qset); +void taosCloseQset(STaosQset *qset); +void taosQsetThreadResume(STaosQset *qset); +int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); +void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); +int32_t taosGetQueueNumber(STaosQset *qset); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo); diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index ae2a57ba97..304f18cd68 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -1,13 +1,13 @@ #include "clientMonitor.h" -#include "clientLog.h" -#include "os.h" -#include "tmisce.h" -#include "ttime.h" -#include "ttimer.h" -#include "tglobal.h" -#include "tqueue.h" #include "cJSON.h" #include "clientInt.h" +#include "clientLog.h" +#include "os.h" +#include "tglobal.h" +#include "tmisce.h" +#include "tqueue.h" +#include "ttime.h" +#include "ttimer.h" SRWLatch monitorLock; void* monitorTimer; @@ -20,41 +20,41 @@ STaosQueue* monitorQueue; SHashObj* monitorSlowLogHash; char tmpSlowLogPath[PATH_MAX] = {0}; -static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size){ +static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) { if (tsTempDir == NULL) { return -1; } int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); - if (ret < 0){ + if (ret < 0) { tscError("failed to get tmp path ret:%d", ret); return ret; } return 0; } -static void processFileInTheEnd(TdFilePtr pFile, char* path){ - if(pFile == NULL){ +static void processFileInTheEnd(TdFilePtr pFile, char* path) { + if (pFile == NULL) { return; } - if(taosFtruncateFile(pFile, 0) != 0){ + if (taosFtruncateFile(pFile, 0) != 0) { tscError("failed to truncate file:%s, errno:%d", path, errno); return; } - if(taosUnLockFile(pFile) != 0){ + if (taosUnLockFile(pFile) != 0) { tscError("failed to unlock file:%s, errno:%d", path, errno); return; } - if(taosCloseFile(&(pFile)) != 0){ + if (taosCloseFile(&(pFile)) != 0) { tscError("failed to close file:%s, errno:%d", path, errno); return; } - if(taosRemoveFile(path) != 0){ + if (taosRemoveFile(path) != 0) { tscError("failed to remove file:%s, errno:%d", path, errno); return; } } -static void destroySlowLogClient(void* data){ +static void destroySlowLogClient(void* data) { if (data == NULL) { return; } @@ -63,40 +63,40 @@ static void destroySlowLogClient(void* data){ taosMemoryFree(slowLogClient); } -static void destroyMonitorClient(void* data){ +static void destroyMonitorClient(void* data) { if (data == NULL) { return; } MonitorClient* pMonitor = *(MonitorClient**)data; - if(pMonitor == NULL){ + if (pMonitor == NULL) { return; } taosTmrStopA(&pMonitor->timer); taosHashCleanup(pMonitor->counters); taos_collector_registry_destroy(pMonitor->registry); -// taos_collector_destroy(pMonitor->colector); + // taos_collector_destroy(pMonitor->colector); taosMemoryFree(pMonitor); } -static void monitorFreeSlowLogData(void *paras) { +static void monitorFreeSlowLogData(void* paras) { MonitorSlowLogData* pData = (MonitorSlowLogData*)paras; if (pData == NULL) { return; } taosMemoryFreeClear(pData->data); - if (pData->type == SLOW_LOG_READ_BEGINNIG){ + if (pData->type == SLOW_LOG_READ_BEGINNIG) { taosMemoryFree(pData->fileName); } } -static void monitorFreeSlowLogDataEx(void *paras) { +static void monitorFreeSlowLogDataEx(void* paras) { monitorFreeSlowLogData(paras); taosMemoryFree(paras); } static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) { - void *p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES); - if(p == NULL){ + void* p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES); + if (p == NULL) { tscError("failed to get app inst, clusterId:%" PRIx64, clusterId); return NULL; } @@ -111,21 +111,25 @@ static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } - if(param != NULL){ + if (param != NULL) { MonitorSlowLogData* p = (MonitorSlowLogData*)param; - if(code != 0){ + if (code != 0) { tscError("failed to send slow log:%s, clusterId:%" PRIx64, p->data, p->clusterId); } - MonitorSlowLogData tmp = {.clusterId = p->clusterId, .type = p->type, .fileName = p->fileName, - .pFile= p->pFile, .offset = p->offset, .data = NULL}; - if(monitorPutData2MonitorQueue(tmp) == 0){ + MonitorSlowLogData tmp = {.clusterId = p->clusterId, + .type = p->type, + .fileName = p->fileName, + .pFile = p->pFile, + .offset = p->offset, + .data = NULL}; + if (monitorPutData2MonitorQueue(tmp) == 0) { p->fileName = NULL; } } return code; } -static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITOR_TYPE type, void* param) { +static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITOR_TYPE type, void* param) { SStatisReq sStatisReq; sStatisReq.pCont = pCont; sStatisReq.contLen = strlen(pCont); @@ -167,11 +171,11 @@ FAILED: return -1; } -static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet *epSet) { +static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) { char ts[50] = {0}; sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL); - if(NULL == pCont) { + if (NULL == pCont) { tscError("generateClusterReport failed, get null content."); return; } @@ -190,8 +194,8 @@ static void reportSendProcess(void* param, void* tmrId) { } MonitorClient* pMonitor = (MonitorClient*)param; - SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); - if(pInst == NULL){ + SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); + if (pInst == NULL) { taosRUnLockLatch(&monitorLock); return; } @@ -202,15 +206,15 @@ static void reportSendProcess(void* param, void* tmrId) { taosRUnLockLatch(&monitorLock); } -static void sendAllCounter(){ +static void sendAllCounter() { MonitorClient** ppMonitor = NULL; while ((ppMonitor = taosHashIterate(monitorSlowLogHash, ppMonitor))) { MonitorClient* pMonitor = *ppMonitor; - if (pMonitor == NULL){ + if (pMonitor == NULL) { continue; } SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); - if(pInst == NULL){ + if (pInst == NULL) { taosHashCancelIterate(monitorCounterHash, ppMonitor); break; } @@ -231,46 +235,48 @@ void monitorCreateClient(int64_t clusterId) { } pMonitor->clusterId = clusterId; char clusterKey[32] = {0}; - if(snprintf(clusterKey, sizeof(clusterKey), "%"PRId64, clusterId) < 0){ + if (snprintf(clusterKey, sizeof(clusterKey), "%" PRId64, clusterId) < 0) { tscError("failed to create cluster key"); goto fail; } pMonitor->registry = taos_collector_registry_new(clusterKey); - if(pMonitor->registry == NULL){ + if (pMonitor->registry == NULL) { tscError("failed to create registry"); goto fail; } pMonitor->colector = taos_collector_new(clusterKey); - if(pMonitor->colector == NULL){ + if (pMonitor->colector == NULL) { tscError("failed to create collector"); goto fail; } taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector); - pMonitor->counters = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + pMonitor->counters = + (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (pMonitor->counters == NULL) { tscError("failed to create monitor counters"); goto fail; } -// taosHashSetFreeFp(pMonitor->counters, destroyCounter); + // taosHashSetFreeFp(pMonitor->counters, destroyCounter); - if(taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0){ + if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) { tscError("failed to put monitor client to hash"); goto fail; } SAppInstInfo* pInst = getAppInstByClusterId(clusterId); - if(pInst == NULL){ + if (pInst == NULL) { tscError("failed to get app instance by cluster id"); pMonitor = NULL; goto fail; } - pMonitor->timer = taosTmrStart(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer); - if(pMonitor->timer == NULL){ + pMonitor->timer = + taosTmrStart(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer); + if (pMonitor->timer == NULL) { tscError("failed to start timer"); goto fail; } - tscInfo("[monitor] monitorCreateClient for %"PRIx64 "finished %p.", clusterId, pMonitor); + tscInfo("[monitor] monitorCreateClient for %" PRIx64 "finished %p.", clusterId, pMonitor); } taosWUnLockLatch(&monitorLock); if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) { @@ -283,7 +289,8 @@ fail: taosWUnLockLatch(&monitorLock); } -void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys) { +void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, + const char** label_keys) { taosWLockLatch(&monitorLock); MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES); if (ppMonitor == NULL || *ppMonitor == NULL) { @@ -291,16 +298,16 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* goto end; } taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); - if (newCounter == NULL) - return; - MonitorClient* pMonitor = *ppMonitor; + if (newCounter == NULL) return; + MonitorClient* pMonitor = *ppMonitor; taos_collector_add_metric(pMonitor->colector, newCounter); - if(taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0){ + if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) { tscError("failed to put counter to monitor"); taos_counter_destroy(newCounter); goto end; } - tscInfo("[monitor] monitorCreateClientCounter %"PRIx64"(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, newCounter); + tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, + newCounter); end: taosWUnLockLatch(&monitorLock); @@ -315,18 +322,18 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char** MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES); if (ppMonitor == NULL || *ppMonitor == NULL) { - tscError("monitorCounterInc not found pMonitor %"PRId64, clusterId); + tscError("monitorCounterInc not found pMonitor %" PRId64, clusterId); goto end; } MonitorClient* pMonitor = *ppMonitor; taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName)); if (ppCounter == NULL || *ppCounter == NULL) { - tscError("monitorCounterInc not found pCounter %"PRIx64":%s.", clusterId, counterName); + tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName); goto end; } taos_counter_inc(*ppCounter, label_values); - tscDebug("[monitor] monitorCounterInc %"PRIx64"(%p):%s", pMonitor->clusterId, pMonitor, counterName); + tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); end: taosWUnLockLatch(&monitorLock); @@ -339,13 +346,13 @@ const char* monitorResultStr(SQL_RESULT_CODE code) { static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); } -static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){ +static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) { TdFilePtr pFile = NULL; - void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); - if (tmp == NULL){ + void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); + if (tmp == NULL) { char path[PATH_MAX] = {0}; char clusterId[32] = {0}; - if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0){ + if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0) { tscError("failed to generate clusterId:%" PRIx64, slowLogData->clusterId); return; } @@ -358,8 +365,8 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP return; } - SlowLogClient *pClient = taosMemoryCalloc(1, sizeof(SlowLogClient)); - if (pClient == NULL){ + SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient)); + if (pClient == NULL) { tscError("failed to allocate memory for slow log client"); taosCloseFile(&pFile); return; @@ -368,58 +375,58 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpP strcpy(pClient->path, path); pClient->offset = 0; pClient->pFile = pFile; - if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0){ + if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) { tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); taosCloseFile(&pFile); taosMemoryFree(pClient); return; } - if(taosLockFile(pFile) < 0){ + if (taosLockFile(pFile) < 0) { tscError("failed to lock file:%p since %s", pFile, terrstr()); return; } - }else{ + } else { pFile = (*(SlowLogClient**)tmp)->pFile; } - if(taosLSeekFile(pFile, 0, SEEK_END) < 0){ + if (taosLSeekFile(pFile, 0, SEEK_END) < 0) { tscError("failed to seek file:%p code: %d", pFile, errno); return; } - if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0){ + if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0) { tscError("failed to write len to file:%p since %s", pFile, terrstr()); } - tscDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId); + tscDebug("[monitor] write slow log to file:%p, clusterId:%" PRIx64, pFile, slowLogData->clusterId); } -static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){ - tscDebug("[monitor] readFile slow begin pFile:%p, offset:%"PRId64 ", size:%"PRId64, pFile, *offset, size); - if(taosLSeekFile(pFile, *offset, SEEK_SET) < 0){ +static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) { + tscDebug("[monitor] readFile slow begin pFile:%p, offset:%" PRId64 ", size:%" PRId64, pFile, *offset, size); + if (taosLSeekFile(pFile, *offset, SEEK_SET) < 0) { tscError("failed to seek file:%p code: %d", pFile, errno); return NULL; } ASSERT(size > *offset); - char* pCont = NULL; + char* pCont = NULL; int64_t totalSize = 0; if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) { - pCont = taosMemoryCalloc(1, 4 + SLOW_LOG_SEND_SIZE_MAX); //4 reserved for [] + pCont = taosMemoryCalloc(1, 4 + SLOW_LOG_SEND_SIZE_MAX); // 4 reserved for [] totalSize = 4 + SLOW_LOG_SEND_SIZE_MAX; - }else{ + } else { pCont = taosMemoryCalloc(1, 4 + (size - *offset)); totalSize = 4 + (size - *offset); } - if(pCont == NULL){ + if (pCont == NULL) { tscError("failed to allocate memory for slow log, size:%" PRId64, totalSize); return NULL; } - char* buf = pCont; + char* buf = pCont; strcat(buf++, "["); int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX); if (readSize <= 0) { - if (readSize < 0){ + if (readSize < 0) { tscError("failed to read len from file:%p since %s", pFile, terrstr()); } taosMemoryFree(pCont); @@ -427,24 +434,24 @@ static char* readFile(TdFilePtr pFile, int64_t *offset, int64_t size){ } totalSize = 0; - while(1){ + while (1) { size_t len = strlen(buf); - totalSize += (len+1); + totalSize += (len + 1); if (totalSize > readSize || len == 0) { - *(buf-1) = ']'; + *(buf - 1) = ']'; *buf = '\0'; break; } - buf[len] = ','; // replace '\0' with ',' + buf[len] = ','; // replace '\0' with ',' buf += (len + 1); - *offset += (len+1); + *offset += (len + 1); } - tscDebug("[monitor] readFile slow log end, data:%s, offset:%"PRId64, pCont, *offset); + tscDebug("[monitor] readFile slow log end, data:%s, offset:%" PRId64, pCont, *offset); return pCont; } -static int64_t getFileSize(char* path){ +static int64_t getFileSize(char* path) { int64_t fileSize = 0; if (taosStatFile(path, &fileSize, NULL, NULL) < 0) { return -1; @@ -453,13 +460,14 @@ static int64_t getFileSize(char* path){ return fileSize; } -static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, char* fileName, void* pTransporter, SEpSet *epSet){ - if (data == NULL){ +static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type, + char* fileName, void* pTransporter, SEpSet* epSet) { + if (data == NULL) { taosMemoryFree(fileName); return -1; } MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); - if(pParam == NULL){ + if (pParam == NULL) { taosMemoryFree(data); taosMemoryFree(fileName); return -1; @@ -473,121 +481,124 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64 return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam); } -static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size, SLOW_LOG_QUEUE_TYPE type, char* fileName){ +static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size, + SLOW_LOG_QUEUE_TYPE type, char* fileName) { SAppInstInfo* pInst = getAppInstByClusterId(clusterId); - if(pInst == NULL){ + if (pInst == NULL) { tscError("failed to get app instance by clusterId:%" PRId64, clusterId); return -1; } SEpSet ep = getEpSet_s(&pInst->mgmtEp); - char* data = readFile(pFile, offset, size); - return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName, pInst->pTransporter, &ep); + char* data = readFile(pFile, offset, size); + return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName, + pInst->pTransporter, &ep); } -static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset){ - int64_t size = getFileSize(*fileName); - if(size <= offset){ +static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) { + int64_t size = getFileSize(*fileName); + if (size <= offset) { processFileInTheEnd(pFile, *fileName); tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); - }else{ + } else { int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName); - tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log clusterId:%"PRId64",ret:%d", clusterId, code); + tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log clusterId:%" PRId64 ",ret:%d", clusterId, code); *fileName = NULL; } } -static void monitorSendSlowLogAtRunning(int64_t clusterId){ +static void monitorSendSlowLogAtRunning(int64_t clusterId) { void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); - if (tmp == NULL){ + if (tmp == NULL) { return; } SlowLogClient* pClient = (*(SlowLogClient**)tmp); - if (pClient == NULL){ + if (pClient == NULL) { return; } - int64_t size = getFileSize(pClient->path); - if(size <= pClient->offset){ - if(taosFtruncateFile(pClient->pFile, 0) < 0){ + int64_t size = getFileSize(pClient->path); + if (size <= pClient->offset) { + if (taosFtruncateFile(pClient->pFile, 0) < 0) { tscError("failed to truncate file:%p code: %d", pClient->pFile, errno); } tscDebug("[monitor] monitorSendSlowLogAtRunning truncate file to 0 file:%p", pClient->pFile); pClient->offset = 0; - }else{ + } else { int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL); - tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log clusterId:%"PRId64",ret:%d", clusterId, code); + tscDebug("[monitor] monitorSendSlowLogAtRunning send slow log clusterId:%" PRId64 ",ret:%d", clusterId, code); } } static bool monitorSendSlowLogAtQuit(int64_t clusterId) { void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); - if (tmp == NULL){ + if (tmp == NULL) { return true; } SlowLogClient* pClient = (*(SlowLogClient**)tmp); - if (pClient == NULL){ + if (pClient == NULL) { return true; } int64_t size = getFileSize(pClient->path); - if(size <= pClient->offset){ + if (size <= pClient->offset) { processFileInTheEnd(pClient->pFile, pClient->path); pClient->pFile = NULL; tscInfo("[monitor] monitorSendSlowLogAtQuit remove file:%s", pClient->path); - if((--quitCnt) == 0){ + if ((--quitCnt) == 0) { return true; } - }else{ + } else { int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL); - tscDebug("[monitor] monitorSendSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", clusterId, code); + tscDebug("[monitor] monitorSendSlowLogAtQuit send slow log clusterId:%" PRId64 ",ret:%d", clusterId, code); } return false; } -static void monitorSendAllSlowLogAtQuit(){ +static void monitorSendAllSlowLogAtQuit() { void* pIter = NULL; while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) { SlowLogClient* pClient = (*(SlowLogClient**)pIter); - if(pClient == NULL) { + if (pClient == NULL) { continue; } int64_t size = getFileSize(pClient->path); - if(size <= pClient->offset){ + if (size <= pClient->offset) { processFileInTheEnd(pClient->pFile, pClient->path); pClient->pFile = NULL; - }else if(pClient->offset == 0){ + } else if (pClient->offset == 0) { int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); - int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL); - tscDebug("[monitor] monitorSendAllSlowLogAtQuit send slow log clusterId:%"PRId64",ret:%d", *clusterId, code); - if (code == 0){ - quitCnt ++; + int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL); + tscDebug("[monitor] monitorSendAllSlowLogAtQuit send slow log clusterId:%" PRId64 ",ret:%d", *clusterId, code); + if (code == 0) { + quitCnt++; } } } } -static void processFileRemoved(SlowLogClient* pClient){ +static void processFileRemoved(SlowLogClient* pClient) { taosUnLockFile(pClient->pFile); taosCloseFile(&(pClient->pFile)); - TdFilePtr pFile = taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); + TdFilePtr pFile = + taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); tscError("failed to open file:%s since %s", pClient->path, terrstr()); - }else{ + } else { pClient->pFile = pFile; } } -static void monitorSendAllSlowLog(){ +static void monitorSendAllSlowLog() { int64_t t = taosGetMonoTimestampMs(); - void* pIter = NULL; + void* pIter = NULL; while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) { int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); SlowLogClient* pClient = (*(SlowLogClient**)pIter); - if (pClient == NULL){ + if (pClient == NULL) { taosHashCancelIterate(monitorSlowLogHash, pIter); return; } - if (t - pClient->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000){ + if (t - pClient->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000) { pClient->lastCheckTime = t; } else { continue; @@ -595,35 +606,35 @@ static void monitorSendAllSlowLog(){ if (pInst != NULL && pClient->offset == 0) { int64_t size = getFileSize(pClient->path); - if(size <= 0){ - if(size < 0){ + if (size <= 0) { + if (size < 0) { tscError("[monitor] monitorSendAllSlowLog failed to get file size:%s, err:%d", pClient->path, errno); - if(errno == ENOENT){ + if (errno == ENOENT) { processFileRemoved(pClient); } } continue; } int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL); - tscDebug("[monitor] monitorSendAllSlowLog send slow log clusterId:%"PRId64",ret:%d", *clusterId, code); + tscDebug("[monitor] monitorSendAllSlowLog send slow log clusterId:%" PRId64 ",ret:%d", *clusterId, code); } } } -static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ +static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) { SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId); - if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){ + if (pInst == NULL || !pInst->monitorParas.tsEnableMonitor) { tscInfo("[monitor] monitor is disabled, skip send slow log"); return; } char namePrefix[PATH_MAX] = {0}; - if (snprintf(namePrefix, sizeof(namePrefix), "%s%"PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) { + if (snprintf(namePrefix, sizeof(namePrefix), "%s%" PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) { tscError("failed to generate slow log file name prefix"); return; } - char tmpPath[PATH_MAX] = {0}; + char tmpPath[PATH_MAX] = {0}; if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) { return; } @@ -639,11 +650,9 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ continue; } - char *name = taosGetDirEntryName(de); - if (strcmp(name, ".") == 0 || - strcmp(name, "..") == 0 || - strstr(name, namePrefix) == NULL) { - tscInfo("skip file:%s, for cluster id:%"PRIx64, name, clusterId); + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || strstr(name, namePrefix) == NULL) { + tscInfo("skip file:%s, for cluster id:%" PRIx64, name, clusterId); continue; } @@ -659,7 +668,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ taosCloseFile(&pFile); continue; } - char *tmp = taosStrdup(filename); + char* tmp = taosStrdup(filename); monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0); taosMemoryFree(tmp); } @@ -667,7 +676,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ taosCloseDir(&pDir); } -static void* monitorThreadFunc(void *param){ +static void* monitorThreadFunc(void* param) { setThreadName("client-monitor-slowlog"); #ifdef WINDOWS @@ -680,18 +689,18 @@ static void* monitorThreadFunc(void *param){ return NULL; } tscDebug("monitorThreadFunc start"); - int64_t quitTime = 0; + int64_t quitTime = 0; while (1) { if (atomic_load_32(&slowLogFlag) > 0 > 0) { - if(quitCnt == 0){ + if (quitCnt == 0) { monitorSendAllSlowLogAtQuit(); - if(quitCnt == 0){ + if (quitCnt == 0) { tscInfo("monitorThreadFunc quit since no slow log to send"); break; } quitTime = taosGetMonoTimestampMs(); } - if(taosGetMonoTimestampMs() - quitTime > 500){ //quit at most 500ms + if (taosGetMonoTimestampMs() - quitTime > 500) { // quit at most 500ms tscInfo("monitorThreadFunc quit since timeout"); break; } @@ -700,18 +709,19 @@ static void* monitorThreadFunc(void *param){ MonitorSlowLogData* slowLogData = NULL; taosReadQitem(monitorQueue, (void**)&slowLogData); if (slowLogData != NULL) { - if (slowLogData->type == SLOW_LOG_READ_BEGINNIG){ - if(slowLogData->pFile != NULL){ - monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, slowLogData->offset); - }else{ + if (slowLogData->type == SLOW_LOG_READ_BEGINNIG) { + if (slowLogData->pFile != NULL) { + monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile, + slowLogData->offset); + } else { monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); } - } else if(slowLogData->type == SLOW_LOG_WRITE){ + } else if (slowLogData->type == SLOW_LOG_WRITE) { monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath); - } else if(slowLogData->type == SLOW_LOG_READ_RUNNING){ + } else if (slowLogData->type == SLOW_LOG_READ_RUNNING) { monitorSendSlowLogAtRunning(slowLogData->clusterId); - } else if(slowLogData->type == SLOW_LOG_READ_QUIT){ - if(monitorSendSlowLogAtQuit(slowLogData->clusterId)){ + } else if (slowLogData->type == SLOW_LOG_READ_QUIT) { + if (monitorSendSlowLogAtQuit(slowLogData->clusterId)) { tscInfo("monitorThreadFunc quit since all slow log sended"); monitorFreeSlowLogData(slowLogData); taosFreeQitem(slowLogData); @@ -757,8 +767,11 @@ static void tscMonitorStop() { } int32_t monitorInit() { + int32_t code; + tscInfo("[monitor] tscMonitor init"); - monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + monitorCounterHash = + (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (monitorCounterHash == NULL) { tscError("failed to create monitorCounterHash"); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -766,7 +779,8 @@ int32_t monitorInit() { } taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); - monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + monitorSlowLogHash = + (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (monitorSlowLogHash == NULL) { tscError("failed to create monitorSlowLogHash"); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -781,7 +795,7 @@ int32_t monitorInit() { return -1; } - if (getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath)) < 0){ + if (getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath)) < 0) { terrno = TSDB_CODE_TSC_INTERNAL_ERROR; return -1; } @@ -798,14 +812,15 @@ int32_t monitorInit() { return -1; } - monitorQueue = taosOpenQueue(); - if(monitorQueue == NULL){ + code = taosOpenQueue(&monitorQueue); + if (code) { + terrno = code; tscError("open queue error since %s", terrstr()); return -1; } taosInitRWLatch(&monitorLock); - if (tscMonitortInit() != 0){ + if (tscMonitortInit() != 0) { return -1; } return 0; @@ -828,21 +843,26 @@ void monitorClose() { taosWUnLockLatch(&monitorLock); } -int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data){ +int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) { + int32_t code; + MonitorSlowLogData* slowLogData; + if (atomic_load_32(&slowLogFlag) == -2) { tscError("[monitor] slow log thread is exiting"); return -1; } - MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0); - if (slowLogData == NULL) { + + code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData); + if (code) { tscError("[monitor] failed to allocate slow log data"); - return -1; + return terrno = code; } *slowLogData = data; - tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " type:%s, data:%s", slowLogData->clusterId, queueTypeStr[slowLogData->type], slowLogData->data); - if (taosWriteQitem(monitorQueue, slowLogData) == 0){ + tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId, + queueTypeStr[slowLogData->type], slowLogData->data); + if (taosWriteQitem(monitorQueue, slowLogData) == 0) { tsem2_post(&monitorSem); - }else{ + } else { monitorFreeSlowLogData(slowLogData); taosFreeQitem(slowLogData); } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3c6ef00bf4..1ea03e15a6 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -733,13 +733,17 @@ end: } static void generateTimedTask(int64_t refId, int32_t type) { - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); + tmq_t* tmq; + int8_t* pTaskType; + int32_t code; + + tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) return; - int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); - if (pTaskType != NULL){ + code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType); + if (code) { *pTaskType = type; - if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0){ + if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) { tsem2_post(&tmq->rspSem); } } @@ -877,7 +881,7 @@ void tmqSendHbReq(void* param, void* tmrId) { OVER: tDestroySMqHbReq(&req); - if(tmrId != NULL){ + if (tmrId != NULL) { taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); } taosReleaseRef(tmqMgmt.rsetId, refId); @@ -890,7 +894,12 @@ static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) { } int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { - STaosQall* qall = taosAllocateQall(); + STaosQall* qall; + int32_t code; + + code = taosAllocateQall(&qall); + if (code) return code; + taosReadAllQitems(pTmq->delayedTask, qall); int32_t numOfItems = taosQallItemSize(qall); @@ -914,7 +923,8 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval / 1000.0); - taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->commitTimer); + taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, + &pTmq->commitTimer); } else { tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); } @@ -977,14 +987,14 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { } int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { - if(param == NULL) { + if (param == NULL) { return code; } SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; - if(pMsg){ + if (pMsg) { taosMemoryFree(pMsg->pEpSet); } tsem_post(&pParam->rspSem); @@ -1057,13 +1067,13 @@ void tmqFreeImpl(void* handle) { taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); taos_close_internal(tmq->pTscObj); - if(tmq->commitTimer) { + if (tmq->commitTimer) { taosTmrStopA(&tmq->commitTimer); } - if(tmq->epTimer) { + if (tmq->epTimer) { taosTmrStopA(&tmq->epTimer); } - if(tmq->hbLiveTimer) { + if (tmq->hbLiveTimer) { taosTmrStopA(&tmq->hbLiveTimer); } taosMemoryFree(tmq); @@ -1101,6 +1111,8 @@ void tmqMgmtClose(void) { if (errstr != NULL) snprintf(errstr, errstrLen, MSG); tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { + int32_t code; + if (conf == NULL) { SET_ERROR_MSG_TMQ("configure is null") return NULL; @@ -1124,12 +1136,31 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass; pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); - pTmq->mqueue = taosOpenQueue(); - pTmq->delayedTask = taosOpenQueue(); - pTmq->qall = taosAllocateQall(); + code = taosOpenQueue(&pTmq->mqueue); + if (code) { + terrno = code; + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); + SET_ERROR_MSG_TMQ("open queue failed") + goto _failed; + } - if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL || - conf->groupId[0] == 0) { + code = taosOpenQueue(&pTmq->delayedTask); + if (code) { + terrno = code; + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); + SET_ERROR_MSG_TMQ("open delayed task queue failed") + goto _failed; + } + + code = taosAllocateQall(&pTmq->qall); + if (code) { + terrno = code; + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); + SET_ERROR_MSG_TMQ("allocate qall failed") + goto _failed; + } + + if (conf->groupId[0] == 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty") @@ -1315,7 +1346,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { // init ep timer tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); // init auto commit timer - tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); + tmq->commitTimer = + taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); FAIL: taosArrayDestroyP(req.topicNames, taosMemoryFree); @@ -1372,20 +1404,20 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (pParam == NULL || pMsg == NULL) { goto FAIL; } - int64_t refId = pParam->refId; - int32_t vgId = pParam->vgId; - uint64_t requestId = pParam->requestId; + int64_t refId = pParam->refId; + int32_t vgId = pParam->vgId; + uint64_t requestId = pParam->requestId; tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { code = TSDB_CODE_TMQ_CONSUMER_CLOSED; goto FAIL; } - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); - if (pRspWrapper == NULL) { + SMqPollRspWrapper* pRspWrapper; + code = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); + if (code) { tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); taosReleaseRef(tmqMgmt.rsetId, refId); - code = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; } @@ -2575,7 +2607,7 @@ end: } int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { - if(param == NULL) return code; + if (param == NULL) return code; SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); if (tmq == NULL) { @@ -2600,9 +2632,9 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { doUpdateLocalEp(tmq, head->epoch, &rsp); tDeleteSMqAskEpRsp(&rsp); } else { - SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0); - if (pWrapper == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + SMqAskEpRspWrapper* pWrapper; + code = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0, (void**)&pWrapper); + if (code) { goto END; } @@ -2620,13 +2652,13 @@ END: FAIL: if (pParam->sync) { SAskEpInfo* pInfo = pParam->pParam; - if(pInfo) { + if (pInfo) { pInfo->code = code; tsem_post(&pInfo->sem); } } - if(pMsg){ + if (pMsg) { taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData); } @@ -2636,11 +2668,11 @@ FAIL: int32_t syncAskEp(tmq_t* pTmq) { SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); - if(pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY; + if (pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY; tsem_init(&pInfo->sem, 0, 0); int32_t code = askEp(pTmq, pInfo, true, false); - if(code == 0){ + if (code == 0) { tsem_wait(&pInfo->sem); code = pInfo->code; } @@ -2778,7 +2810,7 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { } static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { - if(param == NULL) { + if (param == NULL) { return code; } SMqVgWalInfoParam* pParam = param; @@ -2812,7 +2844,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { tsem_post(&pCommon->rsp); } - if(pMsg){ + if (pMsg) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } @@ -3248,7 +3280,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } - if(param == NULL) { + if (param == NULL) { return code; } SMqSeekParam* pParam = param; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index e5c32f9a43..f28de4695c 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -153,6 +153,8 @@ int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { + int32_t code; + SSingleWorker *pWorker = NULL; switch (qtype) { case WRITE_QUEUE: @@ -181,13 +183,15 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { } if (pWorker == NULL) return -1; - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); - if (pMsg == NULL) return -1; + SRpcMsg *pMsg; + code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg); + if (code) return code; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); pRpc->pCont = NULL; - dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType), pRpc->contLen); - int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg); + dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType), + pRpc->contLen); + code = mmPutMsgToWorker(pMgmt, pWorker, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 5c635ff5ea..b443cbc351 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -58,8 +58,11 @@ int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); - if (pMsg == NULL) return -1; + int32_t code; + SRpcMsg *pMsg; + + code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg); + if (code) return code; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); pRpc->pCont = NULL; diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 1c57685414..e356380039 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -126,8 +126,11 @@ void smStopWorker(SSnodeMgmt *pMgmt) { } int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); - if (pMsg == NULL) { + int32_t code; + SRpcMsg *pMsg; + + code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg); + if (code) { rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; return -1; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 0415340be9..b7d35030b8 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -311,6 +311,7 @@ int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { + int32_t code; if (pRpc->contLen < sizeof(SMsgHead)) { dError("invalid rpc msg with no msg head at pCont. pRpc:%p, type:%s, len:%d", pRpc, TMSG_INFO(pRpc->msgType), pRpc->contLen); @@ -320,8 +321,9 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { } EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM; - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); - if (pMsg == NULL) { + SRpcMsg *pMsg; + code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg); + if (code) { rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; return -1; @@ -335,7 +337,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { memcpy(pMsg, pRpc, sizeof(SRpcMsg)); pRpc->pCont = NULL; - int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype); + code = vmPutMsgToQueue(pMgmt, pMsg, qtype); if (code != 0) { dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 99d641ff3f..bf35319fae 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -210,8 +210,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { pRpc->info.wrapper = pWrapper; EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsRpcQueueMemoryUsed - pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); - if (pMsg == NULL) goto _OVER; + code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg); + if (code) goto _OVER; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle, diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index a48188430f..214d0300e1 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -61,7 +61,7 @@ struct SRSmaQTaskInfoItem { int32_t len; int8_t type; int64_t suid; - void * qTaskInfo; + void *qTaskInfo; }; static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { @@ -185,7 +185,7 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) { void *pIter = NULL; while ((pIter = taosHashIterate(pStore->uidHash, pIter))) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - SArray * pTbUids = *(SArray **)pIter; + SArray *pTbUids = *(SArray **)pIter; if (tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd) != TSDB_CODE_SUCCESS) { taosHashCancelIterate(pStore->uidHash, pIter); @@ -213,7 +213,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui } SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - SHashObj * infoHash = NULL; + SHashObj *infoHash = NULL; if (!pStat || !(infoHash = RSMA_INFO_HASH(pStat))) { terrno = TSDB_CODE_RSMA_INVALID_STAT; return TSDB_CODE_FAILED; @@ -264,11 +264,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat int8_t idx) { if ((param->qmsgLen > 0) && param->qmsg[idx]) { SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); - SRetention * pRetention = SMA_RETENTION(pSma); - STsdbCfg * pTsdbCfg = SMA_TSDB_CFG(pSma); - SVnode * pVnode = pSma->pVnode; + SRetention *pRetention = SMA_RETENTION(pSma); + STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); + SVnode *pVnode = pSma->pVnode; char taskInfDir[TSDB_FILENAME_LEN] = {0}; - void * pStreamState = NULL; + void *pStreamState = NULL; // set the backend of stream state tdRSmaQTaskInfoGetFullPath(pVnode, pRSmaInfo->suid, idx + 1, pVnode->pTfs, taskInfDir); @@ -362,6 +362,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat * @return int32_t */ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) { + int32_t code; + if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid); return TSDB_CODE_SUCCESS; @@ -374,7 +376,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con } #endif - SSmaEnv * pEnv = SMA_RSMA_ENV(pSma); + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; @@ -401,8 +403,13 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con pRSmaInfo->suid = suid; T_REF_INIT_VAL(pRSmaInfo, 1); - if (!(pRSmaInfo->queue = taosOpenQueue()) || !(pRSmaInfo->qall = taosAllocateQall()) || - tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0 || + code = taosOpenQueue(&pRSmaInfo->queue); + if (code) goto _err; + + code = taosAllocateQall(&pRSmaInfo->qall); + if (code) goto _err; + + if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0 || tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) { goto _err; } @@ -673,8 +680,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma int32_t code = 0; int32_t lino = 0; SSDataBlock *output = NULL; - SArray * pResList = pItem->pResList; - STSchema * pTSchema = pInfo->pTSchema; + SArray *pResList = pItem->pResList; + STSchema *pTSchema = pInfo->pTSchema; int64_t suid = pInfo->suid; while (1) { @@ -733,7 +740,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma } } - STsdb * sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); SSubmitReq2 *pReq = NULL; if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) { @@ -795,11 +802,13 @@ _exit: */ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid) { + int32_t code; int32_t size = RSMA_EXEC_MSG_HLEN + len; // header + payload - void * qItem = taosAllocateQitem(size, DEF_QITEM, 0); + void *qItem; - if (!qItem) { - return TSDB_CODE_FAILED; + code = taosAllocateQitem(size, DEF_QITEM, 0, (void **)&qItem); + if (code) { + return code; } void *pItem = qItem; @@ -874,7 +883,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType, SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) { int32_t idx = level - 1; - void * qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); + void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); if (!qTaskInfo) { @@ -914,7 +923,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { int32_t code = 0; int32_t lino = 0; - SSmaEnv * pEnv = SMA_RSMA_ENV(pSma); + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = NULL; SRSmaInfo *pRSmaInfo = NULL; @@ -1069,8 +1078,8 @@ _err: static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { int32_t code = 0; int32_t lino = 0; - SVnode * pVnode = pSma->pVnode; - SArray * suidList = NULL; + SVnode *pVnode = pSma->pVnode; + SArray *suidList = NULL; STbUidStore uidStore = {0}; SMetaReader mr = {0}; tb_uid_t suid = 0; @@ -1198,7 +1207,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t code = 0; int32_t lino = 0; int32_t nTaskInfo = 0; - SSma * pSma = pRSmaStat->pSma; + SSma *pSma = pRSmaStat->pSma; SVnode *pVnode = pSma->pVnode; if (taosHashGetSize(pInfoHash) <= 0) { @@ -1231,7 +1240,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { do { int32_t nStreamFlushed = 0; int32_t nSleep = 0; - void * infoHash = NULL; + void *infoHash = NULL; while (true) { while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; @@ -1273,7 +1282,7 @@ _checkpoint: SStreamMeta *pMeta = NULL; int64_t checkpointId = taosGetTimestampNs(); bool checkpointBuilt = false; - void * infoHash = NULL; + void *infoHash = NULL; while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; if (RSMA_INFO_IS_DEL(pRSmaInfo)) { @@ -1346,10 +1355,10 @@ _exit: * @param tmrId */ static void tdRSmaFetchTrigger(void *param, void *tmrId) { - SRSmaRef * pRSmaRef = NULL; - SSma * pSma = NULL; - SRSmaStat * pStat = NULL; - SRSmaInfo * pRSmaInfo = NULL; + SRSmaRef *pRSmaRef = NULL; + SSma *pSma = NULL; + SRSmaStat *pStat = NULL; + SRSmaInfo *pRSmaInfo = NULL; SRSmaInfoItem *pItem = NULL; if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, ¶m, POINTER_BYTES))) { @@ -1517,7 +1526,7 @@ _err: } static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) { - void * msg = NULL; + void *msg = NULL; int8_t resume = 0; int32_t nSubmit = 0; int32_t nDelete = 0; @@ -1548,7 +1557,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA _resume_delete: version = RSMA_EXEC_MSG_VER(msg); if ((terrno = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version, - &packData.pDataBlock, 1))) { + &packData.pDataBlock, 1))) { taosFreeQitem(msg); goto _err; } @@ -1621,11 +1630,11 @@ _err: int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { int32_t code = 0; int32_t lino = 0; - SVnode * pVnode = pSma->pVnode; - SSmaEnv * pEnv = SMA_RSMA_ENV(pSma); + SVnode *pVnode = pSma->pVnode; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - SHashObj * infoHash = NULL; - SArray * pSubmitArr = NULL; + SHashObj *infoHash = NULL; + SArray *pSubmitArr = NULL; bool isFetchAll = false; if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index eb1da51b45..16c523a50e 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -164,7 +164,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, taosWUnLockLatch(&pTq->lock); } - tOffsetCopy(&dataRsp.common.reqOffset, pOffset); // reqOffset represents the current date offset, may be changed if wal not exists + tOffsetCopy(&dataRsp.common.reqOffset, + pOffset); // reqOffset represents the current date offset, may be changed if wal not exists code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); end : { @@ -178,25 +179,25 @@ end : { } } -#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC) \ - SDecoder decoder = {0};\ - TYPE req = {0}; \ - void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \ - int32_t len = pHead->bodyLen - sizeof(SMsgHead); \ - tDecoderInit(&decoder, data, len); \ - if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) { \ - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \ - pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \ - fetchVer++; \ - DELETE_FUNC(&req); \ - tDecoderClear(&decoder); \ - continue; \ - } \ - DELETE_FUNC(&req); \ +#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC) \ + SDecoder decoder = {0}; \ + TYPE req = {0}; \ + void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \ + int32_t len = pHead->bodyLen - sizeof(SMsgHead); \ + tDecoderInit(&decoder, data, len); \ + if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) { \ + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \ + " msgType %d", \ + pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \ + fetchVer++; \ + DELETE_FUNC(&req); \ + tDecoderClear(&decoder); \ + continue; \ + } \ + DELETE_FUNC(&req); \ tDecoderClear(&decoder); -static void tDeleteCommon(void* parm) { -} +static void tDeleteCommon(void* parm) {} static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* offset) { @@ -313,9 +314,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); continue; } - int32_t tLen = sizeof(SMqRspHead) + len; - void* tBuf = taosMemoryCalloc(1, tLen); - void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead)); + int32_t tLen = sizeof(SMqRspHead) + len; + void* tBuf = taosMemoryCalloc(1, tLen); + void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead)); SEncoder encoder = {0}; tEncoderInit(&encoder, metaBuff, len); code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp); @@ -374,13 +375,13 @@ end: } int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { - int32_t code = 0; + int32_t code = 0; STqOffsetVal reqOffset = {0}; tOffsetCopy(&reqOffset, &pRequest->reqOffset); // reset the offset if needed if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) { - bool blockReturned = false; + bool blockReturned = false; code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned); if (code != 0) { goto END; @@ -392,7 +393,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ } } else if (reqOffset.type == 0) { // use the consumer specified offset uError("req offset type is 0"); - code = TSDB_CODE_TMQ_INVALID_MSG; + code = TSDB_CODE_TMQ_INVALID_MSG; goto END; } @@ -416,8 +417,8 @@ static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int pMsgHead->walever = ever; } -int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqBatchMetaRsp* pRsp, - int32_t vgId) { +int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, + const SMqBatchMetaRsp* pRsp, int32_t vgId) { int32_t len = 0; int32_t code = 0; tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code); @@ -444,8 +445,8 @@ int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SM SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0}; tmsgSendRsp(&resp); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d", vgId, - pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type); + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d", + vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type); return 0; } @@ -527,6 +528,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const void* pRsp, } int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) { + int32_t code; SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; @@ -570,11 +572,11 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void* taosArrayDestroy(pRes->uidList); if (type == 0) { - *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); - if (*pRefBlock == NULL) { + code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock); + if (code) { blockDataCleanup(pDelBlock); taosMemoryFree(pDelBlock); - return TSDB_CODE_OUT_OF_MEMORY; + return code; } ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK; @@ -614,7 +616,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b continue; } - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId); diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 15288c4406..aa377cd922 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -130,9 +130,11 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; - SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0); - if (NULL == pBuf) { - return TSDB_CODE_OUT_OF_MEMORY; + SDataDeleterBuf* pBuf; + + int32_t code = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0, (void**)&pBuf); + if (code) { + return code; } if (!allocBuf(pDeleter, pInput, pBuf)) { @@ -227,7 +229,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { } taosCloseQueue(pDeleter->pDataBlocks); taosThreadMutexDestroy(&pDeleter->mutex); - + taosMemoryFree(pDeleter->pManager); return TSDB_CODE_SUCCESS; } @@ -270,12 +272,11 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData deleter->pParam = pParam; deleter->status = DS_BUF_EMPTY; deleter->queryEnd = false; - deleter->pDataBlocks = taosOpenQueue(); - taosThreadMutexInit(&deleter->mutex, NULL); - if (NULL == deleter->pDataBlocks) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = taosOpenQueue(&deleter->pDataBlocks); + if (code) { goto _end; } + taosThreadMutexInit(&deleter->mutex, NULL); *pHandle = deleter; return code; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 297c87ab40..3981cedd3f 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -104,7 +104,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn } int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, numOfCols); - int32_t len = tsCompressString(pHandle->pCompressBuf, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0); + int32_t len = + tsCompressString(pHandle->pCompressBuf, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0); if (len < dataLen) { pEntry->compressed = 1; pEntry->dataLen = len; @@ -168,9 +169,11 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { int32_t code = 0; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; - SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0); - if (NULL == pBuf) { - return TSDB_CODE_OUT_OF_MEMORY; + SDataDispatchBuf* pBuf; + + code = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0, (void**)&pBuf); + if (code) { + return code; } if (!allocBuf(pDispatcher, pInput, pBuf)) { @@ -228,7 +231,6 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRow ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows); } - static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (NULL == pDispatcher->nextOutput.pData) { @@ -291,6 +293,8 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { } int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { + int32_t code; + SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle)); if (NULL == dispatcher) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -309,7 +313,11 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD dispatcher->pSchema = pDataSink->pInputDataBlockDesc; dispatcher->status = DS_BUF_EMPTY; dispatcher->queryEnd = false; - dispatcher->pDataBlocks = taosOpenQueue(); + code = taosOpenQueue(&dispatcher->pDataBlocks); + if (code) { + terrno = code; + goto _return; + } taosThreadMutexInit(&dispatcher->mutex, NULL); if (NULL == dispatcher->pDataBlocks) { diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 9ff6fc3e49..8502856754 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -158,7 +158,9 @@ int32_t cleanupTaskQueue() { } int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) { - SSchedMsg* pSchedMsg = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0); + SSchedMsg* pSchedMsg; + int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg); + if (rc) return rc; pSchedMsg->fp = NULL; pSchedMsg->ahandle = execFn; pSchedMsg->thandle = execParam; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 990e7fb987..023783a2ae 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -31,9 +31,11 @@ static void checkpointTriggerMonitorFn(void* param, void* tmrId); SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId, int32_t srcTaskId) { - SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); - if (pChkpoint == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + SStreamDataBlock* pChkpoint; + + int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pChkpoint); + if (code) { + terrno = code; return NULL; } @@ -425,7 +427,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream } void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { - pTask->chkInfo.startTs = 0; // clear the recorded start time + pTask->chkInfo.startTs = 0; // clear the recorded start time streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index fae90f4db8..2994287aff 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -16,8 +16,11 @@ #include "streamInt.h" SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen); - if (pData == NULL) { + SStreamDataBlock* pData; + + int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen, (void**)&pData); + if (code) { + terrno = code; return NULL; } @@ -34,7 +37,7 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen))); for (int32_t i = 0; i < blockNum; i++) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i); + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pReq->data, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i); int32_t compLen = *(int32_t*)pRetrieve->data; @@ -42,7 +45,7 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe char* pInput = pRetrieve->data + PAYLOAD_PREFIX_LEN; if (pRetrieve->compressed && compLen < fullLen) { - char* p = taosMemoryMalloc(fullLen); + char* p = taosMemoryMalloc(fullLen); int32_t len = tsDecompressString(pInput, compLen, 1, p, fullLen, ONE_STAGE_COMP, NULL, 0); ASSERT(len == fullLen); pInput = p; @@ -69,10 +72,14 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe return pData; } -SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) { - SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize); - if (pStreamBlocks == NULL) { +SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, + SArray* pRes) { + SStreamDataBlock* pStreamBlocks; + + int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize, (void**)&pStreamBlocks); + if (code) { taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + terrno = code; return NULL; } @@ -132,8 +139,10 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock } SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { - SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen); - if (pDataSubmit == NULL) { + SStreamDataSubmit* pDataSubmit; + int32_t code = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen, (void**)&pDataSubmit); + if (code) { + terrno = code; return NULL; } @@ -151,8 +160,11 @@ void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { } SStreamMergedSubmit* streamMergedSubmitNew() { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0); - if (pMerged == NULL) { + SStreamMergedSubmit* pMerged; + + int32_t code = taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0, (void**)&pMerged); + if (code) { + terrno = code; return NULL; } @@ -178,7 +190,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm // todo handle memory error SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { terrno = 0; - + if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem; @@ -212,7 +224,8 @@ SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueI taosFreeQitem(pElem); return (SStreamQueueItem*)pMerged; } else { - stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type), dst->type); + stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type), + dst->type); return NULL; } } @@ -245,8 +258,9 @@ void streamFreeQitem(SStreamQueueItem* data) { SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; blockDataDestroy(pRefBlock->pBlock); taosFreeQitem(pRefBlock); - } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { - SStreamDataBlock* pBlock = (SStreamDataBlock*) data; + } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || + type == STREAM_INPUT__TRANS_STATE) { + SStreamDataBlock* pBlock = (SStreamDataBlock*)data; taosArrayDestroyEx(pBlock->blocks, freeItems); taosFreeQitem(pBlock); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 9c5c230a3d..7bc50417bd 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -15,8 +15,8 @@ #include "streamInt.h" -#define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec +#define MAX_STREAM_EXEC_BATCH_NUM 32 +#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec // todo refactor: // read data from input queue @@ -42,18 +42,26 @@ static void streamQueueCleanup(SStreamQueue* pQueue) { static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } SStreamQueue* streamQueueOpen(int64_t cap) { + int32_t code; + SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); if (pQueue == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pQueue->pQueue = taosOpenQueue(); - pQueue->qall = taosAllocateQall(); + code = taosOpenQueue(&pQueue->pQueue); + if (code) { + taosMemoryFreeClear(pQueue); + terrno = code; + return NULL; + } - if (pQueue->pQueue == NULL || pQueue->qall == NULL) { - if (pQueue->pQueue) taosCloseQueue(pQueue->pQueue); - if (pQueue->qall) taosFreeQall(pQueue->qall); + code = taosAllocateQall(&pQueue->qall); + if (code) { + taosCloseQueue(pQueue->pQueue); taosMemoryFree(pQueue); + terrno = code; return NULL; } @@ -64,7 +72,8 @@ SStreamQueue* streamQueueOpen(int64_t cap) { } void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { - stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue, taosQueueItemSize(pQueue->pQueue)); + stDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->pQueue, + taosQueueItemSize(pQueue->pQueue)); streamQueueCleanup(pQueue); taosFreeQall(pQueue->qall); @@ -129,12 +138,12 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) { } int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) { - STaosQnode* p = (STaosQnode*)((char*) pItem - sizeof(STaosQnode)); + STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode)); return p->dataSize; } void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) { - STaosQnode* p = (STaosQnode*)((char*) pItem - sizeof(STaosQnode)); + STaosQnode* p = (STaosQnode*)((char*)pItem - sizeof(STaosQnode)); p->dataSize += size; } @@ -152,7 +161,7 @@ const char* streamQueueItemGetTypeStr(int32_t type) { } EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, - int32_t* blockSize) { + int32_t* blockSize) { const char* id = pTask->id.idStr; int32_t taskLevel = pTask->info.taskLevel; @@ -174,7 +183,6 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue); if (qItem == NULL) { - // restore the token to bucket if (*numOfBlocks > 0) { *blockSize = streamQueueItemGetSize(*pInput); @@ -203,7 +211,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte *numOfBlocks = 1; *pInput = qItem; return EXEC_CONTINUE; - } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block + } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); *blockSize = streamQueueItemGetSize(*pInput); if (taskLevel == TASK_LEVEL__SINK) { @@ -223,7 +231,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte if (newRet == NULL) { if (terrno != 0) { stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, - tstrerror(terrno)); + tstrerror(terrno)); } *blockSize = streamQueueItemGetSize(*pInput); @@ -284,14 +292,14 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already. stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - msgLen, ver, total, size + SIZE_IN_MiB(msgLen)); + msgLen, ver, total, size + SIZE_IN_MiB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { if (streamQueueIsFull(pTask->inputq.queue)) { double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); + pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); streamFreeQitem(pItem); return -1; } @@ -314,7 +322,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, - pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size); + pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. int32_t code = taosWriteQitem(pQueue, pItem); @@ -332,16 +340,20 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.delaySchedParam != 0)) { atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); - stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr, pTask->schedInfo.status); + stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr, + pTask->schedInfo.status); } return 0; } int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { - SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); - if (pTranstate == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + int32_t code; + SStreamDataBlock* pTranstate; + + code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pTranstate); + if (code) { + return code; } SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); @@ -371,13 +383,13 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { // the result should be put into the outputQ in any cases, the result may be lost otherwise. int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { STaosQueue* pQueue = pTask->outputq.queue->pQueue; - int32_t code = taosWriteQitem(pQueue, pBlock); + int32_t code = taosWriteQitem(pQueue, pBlock); int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); if (code != 0) { stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost", - pTask->id.idStr, total + 1, size, tstrerror(code)); + pTask->id.idStr, total + 1, size, tstrerror(code)); } else { if (streamQueueIsFull(pTask->outputq.queue)) { stWarn( @@ -427,7 +439,7 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) { // increase the new available quota as time goes on int64_t deltaQuota = now - pBucket->quotaFillTimestamp; - double incSize = (deltaQuota / 1000.0) * pBucket->quotaRate; + double incSize = (deltaQuota / 1000.0) * pBucket->quotaRate; if (incSize > 0) { pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); pBucket->quotaFillTimestamp = now; @@ -447,7 +459,7 @@ bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) { if (pBucket->quotaRemain > 0) { pBucket->numOfToken -= 1; return true; - } else { // no available size quota now + } else { // no available size quota now return false; } } else { @@ -460,8 +472,6 @@ void streamTaskPutbackToken(STokenBucket* pBucket) { } // size in KB -void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { - pBucket->quotaRemain -= SIZE_IN_MiB(bytes); -} +void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); } void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); } \ No newline at end of file diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 0a54dfa4c8..f33c2c7b68 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -134,11 +134,14 @@ void streamTaskSchedHelper(void* param, void* tmrId) { stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); } else { if (status == TASK_TRIGGER_STATUS__ACTIVE) { - SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); - if (pTrigger == NULL) { + SStreamTrigger* pTrigger; + + int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&pTrigger); + if (code) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); + terrno = code; return; } @@ -156,7 +159,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); pTrigger->pBlock->info.type = STREAM_GET_ALL; - int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); + code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); if (code != TSDB_CODE_SUCCESS) { taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer); return; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4cbe0cb136..221250ba74 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -979,10 +979,13 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { } static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock)); - if (pData == NULL) { + int32_t code; + SStreamDataBlock* pData; + + code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock), (void**)&pData); + if (code) { stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr); - return terrno; + return terrno = code; } // enqueue @@ -992,7 +995,7 @@ static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pData->type = STREAM_INPUT__DATA_RETRIEVE; pData->srcVgId = 0; - int32_t code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr); + code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { taosFreeQitem(pData); return code; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 45a8a462fb..b2d2622407 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -17,6 +17,7 @@ #include "tqueue.h" #include "taoserror.h" #include "tlog.h" +#include "tutil.h" int64_t tsRpcQueueMemoryAllowed = 0; int64_t tsRpcQueueMemoryUsed = 0; @@ -58,20 +59,20 @@ struct STaosQall { void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; } void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; } -STaosQueue *taosOpenQueue() { - STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue)); - if (queue == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; +int32_t taosOpenQueue(STaosQueue **queue) { + *queue = taosMemoryCalloc(1, sizeof(STaosQueue)); + if (*queue == NULL) { + return (terrno = TSDB_CODE_OUT_OF_MEMORY); } - if (taosThreadMutexInit(&queue->mutex, NULL) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + int32_t code = taosThreadMutexInit(&(*queue)->mutex, NULL); + if (code) { + taosMemoryFreeClear(*queue); + return (terrno = TAOS_SYSTEM_ERROR(code)); } uDebug("queue:%p is opened", queue); - return queue; + return 0; } void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) { @@ -146,11 +147,12 @@ int64_t taosQueueMemorySize(STaosQueue *queue) { return memOfItems; } -void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) { +int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) { + *item = NULL; + STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); if (pNode == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } pNode->dataSize = dataSize; @@ -165,15 +167,15 @@ void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) { tsRpcQueueMemoryAllowed); atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); taosMemoryFree(pNode); - terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE; - return NULL; + return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE); } uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced); } else { uTrace("item:%p, node:%p is allocated", pNode->item, pNode); } - return pNode->item; + *item = pNode->item; + return 0; } void taosFreeQitem(void *pItem) { @@ -221,13 +223,17 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { } queue->numOfItems++; queue->memOfItems += (pNode->size + pNode->dataSize); - if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); + if (queue->qset) { + atomic_add_fetch_32(&queue->qset->numOfItems, 1); + } uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems); taosThreadMutexUnlock(&queue->mutex); - if (queue->qset) tsem_post(&queue->qset->sem); + if (queue->qset) { + tsem_post(&queue->qset->sem); + } return code; } @@ -241,10 +247,14 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { pNode = queue->head; *ppItem = pNode->item; queue->head = pNode->next; - if (queue->head == NULL) queue->tail = NULL; + if (queue->head == NULL) { + queue->tail = NULL; + } queue->numOfItems--; queue->memOfItems -= (pNode->size + pNode->dataSize); - if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); + if (queue->qset) { + atomic_sub_fetch_32(&queue->qset->numOfItems, 1); + } code = 1; uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, queue->memOfItems); @@ -255,12 +265,12 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { return code; } -STaosQall *taosAllocateQall() { - STaosQall *qall = taosMemoryCalloc(1, sizeof(STaosQall)); - if (qall != NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; +int32_t taosAllocateQall(STaosQall **qall) { + *qall = taosMemoryCalloc(1, sizeof(STaosQall)); + if (*qall == NULL) { + return terrno = TSDB_CODE_OUT_OF_MEMORY; } - return qall; + return 0; } void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); } @@ -290,7 +300,9 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { queue->memOfItems = 0; uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems, queue->memOfItems); - if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); + if (queue->qset) { + atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); + } } taosThreadMutexUnlock(&queue->mutex); @@ -326,18 +338,17 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) { return num; } -STaosQset *taosOpenQset() { - STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1); - if (qset == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; +int32_t taosOpenQset(STaosQset **qset) { + *qset = taosMemoryCalloc(sizeof(STaosQset), 1); + if (*qset == NULL) { + return terrno = TSDB_CODE_OUT_OF_MEMORY; } - taosThreadMutexInit(&qset->mutex, NULL); - tsem_init(&qset->sem, 0, 0); + taosThreadMutexInit(&(*qset)->mutex, NULL); + tsem_init(&(*qset)->sem, 0, 0); uDebug("qset:%p is opened", qset); - return qset; + return 0; } void taosCloseQset(STaosQset *qset) { @@ -533,9 +544,7 @@ int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfI void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } -void taosQueueSetThreadId(STaosQueue* pQueue, int64_t threadId) { - pQueue->threadId = threadId; -} +void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId) { pQueue->threadId = threadId; } int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 7a97dc3527..0960d37817 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -16,20 +16,22 @@ #define _DEFAULT_SOURCE #include "tworker.h" #include "taoserror.h" +#include "tcompare.h" #include "tgeosctx.h" #include "tlog.h" -#include "tcompare.h" #define QUEUE_THRESHOLD (1000 * 1000) typedef void *(*ThreadFp)(void *param); int32_t tQWorkerInit(SQWorkerPool *pool) { - pool->qset = taosOpenQset(); + int32_t code = taosOpenQset(&pool->qset); + if (code) return code; + pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker)); if (pool->workers == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + taosCloseQset(pool->qset); + return terrno = TSDB_CODE_OUT_OF_MEMORY; } (void)taosThreadMutexInit(&pool->mutex, NULL); @@ -109,8 +111,14 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) { } STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { - STaosQueue *queue = taosOpenQueue(); - if (queue == NULL) return NULL; + int32_t code; + STaosQueue *queue; + + code = taosOpenQueue(&queue); + if (code) { + terrno = code; + return NULL; + } taosThreadMutexLock(&pool->mutex); taosSetQueueFp(queue, fp, NULL); @@ -150,11 +158,17 @@ void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { } int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) { - pool->qset = taosOpenQset(); + int32_t code; + + code = taosOpenQset(&pool->qset); + if (code) { + return terrno = code; + } + pool->workers = taosArrayInit(2, sizeof(SQueueWorker *)); if (pool->workers == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + taosCloseQset(pool->qset); + return terrno = TSDB_CODE_OUT_OF_MEMORY; } (void)taosThreadMutexInit(&pool->mutex, NULL); @@ -228,8 +242,14 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) { } STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) { - STaosQueue *queue = taosOpenQueue(); - if (queue == NULL) return NULL; + int32_t code; + STaosQueue *queue; + + code = taosOpenQueue(&queue); + if (code) { + terrno = code; + return NULL; + } taosThreadMutexLock(&pool->mutex); taosSetQueueFp(queue, fp, NULL); @@ -375,20 +395,21 @@ static void *tWWorkerThreadFp(SWWorker *worker) { STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { taosThreadMutexLock(&pool->mutex); - SWWorker *worker = pool->workers + pool->nextId; - int32_t code = -1; + SWWorker *worker = pool->workers + pool->nextId; + int32_t code = -1; + STaosQueue *queue; - STaosQueue *queue = taosOpenQueue(); - if (queue == NULL) goto _OVER; + code = taosOpenQueue(&queue); + if (code) goto _OVER; taosSetQueueFp(queue, NULL, fp); if (worker->qset == NULL) { - worker->qset = taosOpenQset(); - if (worker->qset == NULL) goto _OVER; + code = taosOpenQset(&worker->qset); + if (code) goto _OVER; taosAddIntoQset(worker->qset, queue, ahandle); - worker->qall = taosAllocateQall(); - if (worker->qall == NULL) goto _OVER; + code = taosAllocateQall(&worker->qall); + if (code) goto _OVER; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); @@ -406,12 +427,10 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { pool->nextId = (pool->nextId + 1) % pool->max; } - code = 0; - _OVER: taosThreadMutexUnlock(&pool->mutex); - if (code == -1) { + if (code) { if (queue != NULL) taosCloseQueue(queue); if (worker->qset != NULL) taosCloseQset(worker->qset); if (worker->qall != NULL) taosFreeQall(worker->qall); @@ -517,13 +536,13 @@ void tMultiWorkerCleanup(SMultiWorker *pWorker) { tWWorkerFreeQueue(&pWorker->pool, pWorker->queue); } -static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool); +static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool); static int32_t tQueryAutoQWorkerBeforeBlocking(void *p); static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p); -static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool); -static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker); +static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool); +static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker); -#define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32) +#define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32) #define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF) static int32_t atomicFetchSubActive(int64_t *ptr, int32_t val) { @@ -544,7 +563,7 @@ static int32_t atomicFetchAddActive(int64_t *ptr, int32_t val) { static int32_t atomicFetchAddRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_add_64(ptr, val)); } -static bool atomicCompareExchangeActive(int64_t* ptr, int32_t* expectedVal, int32_t newVal) { +static bool atomicCompareExchangeActive(int64_t *ptr, int32_t *expectedVal, int32_t newVal) { int64_t oldVal64 = *expectedVal, newVal64 = newVal; int32_t running = GET_RUNNING_N(*ptr); oldVal64 <<= 32; @@ -560,7 +579,7 @@ static bool atomicCompareExchangeActive(int64_t* ptr, int32_t* expectedVal, int3 } } -static int64_t atomicCompareExchangeRunning(int64_t* ptr, int32_t* expectedVal, int32_t newVal) { +static int64_t atomicCompareExchangeRunning(int64_t *ptr, int32_t *expectedVal, int32_t newVal) { int64_t oldVal64 = *expectedVal, newVal64 = newVal; int64_t activeShifted = GET_ACTIVE_N(*ptr); activeShifted <<= 32; @@ -576,7 +595,7 @@ static int64_t atomicCompareExchangeRunning(int64_t* ptr, int32_t* expectedVal, } static int64_t atomicCompareExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive, - int32_t *expectedRunning, int32_t newRunning) { + int32_t *expectedRunning, int32_t newRunning) { int64_t oldVal64 = *expectedActive, newVal64 = newActive; oldVal64 <<= 32; oldVal64 |= *expectedRunning; @@ -641,7 +660,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) { SQueryAutoQWorkerPool *pPool = p; bool ret = false; - int32_t waiting = pPool->waitingAfterBlockN; + int32_t waiting = pPool->waitingAfterBlockN; while (waiting > 0) { int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1); if (waitingNew == waiting) { @@ -656,10 +675,10 @@ static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) { return ret; } -static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void* p) { +static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void *p) { SQueryAutoQWorkerPool *pPool = p; bool ret = false; - int32_t waiting = pPool->waitingBeforeProcessMsgN; + int32_t waiting = pPool->waitingBeforeProcessMsgN; while (waiting > 0) { int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1); if (waitingNew == waiting) { @@ -674,10 +693,10 @@ static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void* p) { return ret; } -static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { +static bool tQueryAutoQWorkerTryDecActive(void *p, int32_t minActive) { SQueryAutoQWorkerPool *pPool = p; bool ret = false; - int64_t val64 = pPool->activeRunningN; + int64_t val64 = pPool->activeRunningN; int32_t active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64); while (active > minActive) { if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1)) @@ -687,7 +706,7 @@ static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { return false; } -static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) { +static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) { while (1) { int64_t val64 = pPool->activeRunningN; int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64); @@ -709,17 +728,17 @@ static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) { return TSDB_CODE_SUCCESS; } -bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker) { +bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) { if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) || tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) { taosThreadMutexLock(&pPool->poolLock); - SListNode* pNode = listNode(pWorker); + SListNode *pNode = listNode(pWorker); tdListPopNode(pPool->workers, pNode); // reclaim some workers if (pWorker->id >= pPool->maxInUse) { while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) { - SListNode* head = tdListPopHead(pPool->exitedWorkers); - SQueryAutoQWorker* pWorker = (SQueryAutoQWorker*)head->data; + SListNode *head = tdListPopHead(pPool->exitedWorkers); + SQueryAutoQWorker *pWorker = (SQueryAutoQWorker *)head->data; if (pWorker && taosCheckPthreadValid(pWorker->thread)) { taosThreadJoin(pWorker->thread, NULL); taosThreadClear(&pWorker->thread); @@ -758,8 +777,9 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQ } int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { - pool->qset = taosOpenQset(); - if (!pool->qset) return terrno; + int32_t code; + code = taosOpenQset(&pool->qset); + if (code) return terrno = code; pool->workers = tdListNew(sizeof(SQueryAutoQWorker)); if (!pool->workers) return TSDB_CODE_OUT_OF_MEMORY; pool->backupWorkers = tdListNew(sizeof(SQueryAutoQWorker)); @@ -812,16 +832,16 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond); taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); - int32_t idx = 0; - SQueryAutoQWorker* worker = NULL; + int32_t idx = 0; + SQueryAutoQWorker *worker = NULL; while (true) { taosThreadMutexLock(&pPool->poolLock); if (listNEles(pPool->workers) == 0) { taosThreadMutexUnlock(&pPool->poolLock); break; } - SListNode* pNode = tdListPopHead(pPool->workers); - worker = (SQueryAutoQWorker*)pNode->data; + SListNode *pNode = tdListPopHead(pPool->workers); + worker = (SQueryAutoQWorker *)pNode->data; taosThreadMutexUnlock(&pPool->poolLock); if (worker && taosCheckPthreadValid(worker->thread)) { taosThreadJoin(worker->thread, NULL); @@ -831,8 +851,8 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { } while (listNEles(pPool->backupWorkers) > 0) { - SListNode* pNode = tdListPopHead(pPool->backupWorkers); - worker = (SQueryAutoQWorker*)pNode->data; + SListNode *pNode = tdListPopHead(pPool->backupWorkers); + worker = (SQueryAutoQWorker *)pNode->data; if (worker && taosCheckPthreadValid(worker->thread)) { taosThreadJoin(worker->thread, NULL); taosThreadClear(&worker->thread); @@ -841,8 +861,8 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { } while (listNEles(pPool->exitedWorkers) > 0) { - SListNode* pNode = tdListPopHead(pPool->exitedWorkers); - worker = (SQueryAutoQWorker*)pNode->data; + SListNode *pNode = tdListPopHead(pPool->exitedWorkers); + worker = (SQueryAutoQWorker *)pNode->data; if (worker && taosCheckPthreadValid(worker->thread)) { taosThreadJoin(worker->thread, NULL); taosThreadClear(&worker->thread); @@ -867,14 +887,18 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { } STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahandle, FItem fp) { - STaosQueue *queue = taosOpenQueue(); - if (queue == NULL) return NULL; + STaosQueue *queue; + int32_t code = taosOpenQueue(&queue); + if (code) { + terrno = code; + return NULL; + } taosThreadMutexLock(&pool->poolLock); taosSetQueueFp(queue, fp, NULL); taosAddIntoQset(pool->qset, queue, ahandle); - SQueryAutoQWorker worker = {0}; - SQueryAutoQWorker* pWorker = NULL; + SQueryAutoQWorker worker = {0}; + SQueryAutoQWorker *pWorker = NULL; // spawn a thread to process queue if (pool->num < pool->max) { @@ -882,14 +906,14 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand worker.id = listNEles(pool->workers); worker.backupIdx = -1; worker.pool = pool; - SListNode* pNode = tdListAdd(pool->workers, &worker); + SListNode *pNode = tdListAdd(pool->workers, &worker); if (!pNode) { taosCloseQueue(queue); queue = NULL; terrno = TSDB_CODE_OUT_OF_MEMORY; break; } - pWorker = (SQueryAutoQWorker*)pNode->data; + pWorker = (SQueryAutoQWorker *)pNode->data; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); @@ -915,11 +939,9 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand return queue; } -void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool *pPool, STaosQueue *pQ) { - taosCloseQueue(pQ); -} +void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool *pPool, STaosQueue *pQ) { taosCloseQueue(pQ); } -static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool) { +static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool) { // try backup pool int32_t backup = pool->backupNum; while (backup > 0) { @@ -931,20 +953,20 @@ static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool) { backup = backupNew; } // backup pool is empty, create new - SQueryAutoQWorker* pWorker = NULL; - SQueryAutoQWorker worker = {0}; + SQueryAutoQWorker *pWorker = NULL; + SQueryAutoQWorker worker = {0}; worker.pool = pool; worker.backupIdx = -1; taosThreadMutexLock(&pool->poolLock); worker.id = listNEles(pool->workers); - SListNode* pNode = tdListAdd(pool->workers, &worker); + SListNode *pNode = tdListAdd(pool->workers, &worker); if (!pNode) { taosThreadMutexUnlock(&pool->poolLock); terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } taosThreadMutexUnlock(&pool->poolLock); - pWorker = (SQueryAutoQWorker*)pNode->data; + pWorker = (SQueryAutoQWorker *)pNode->data; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); @@ -960,7 +982,7 @@ static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool) { } static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) { - SQueryAutoQWorkerPool* pPool = p; + SQueryAutoQWorkerPool *pPool = p; if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(p) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(p) || tQueryAutoQWorkerTryDecActive(p, 1)) { } else { @@ -974,9 +996,9 @@ static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) { } static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) { - SQueryAutoQWorkerPool* pPool = p; - int64_t val64 = pPool->activeRunningN; - int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64); + SQueryAutoQWorkerPool *pPool = p; + int64_t val64 = pPool->activeRunningN; + int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64); while (running < pPool->num) { if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) { return TSDB_CODE_SUCCESS; From 5c6c9a8219fd455ff6f44e203d7f899b0fd80845 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 16 Jul 2024 20:26:46 +0800 Subject: [PATCH 2/5] =?UTF-8?q?fix=EF=BC=9A=20test=20compile?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- source/libs/transport/test/svrBench.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/transport/test/svrBench.c b/source/libs/transport/test/svrBench.c index a3fa81662c..dff3efef12 100644 --- a/source/libs/transport/test/svrBench.c +++ b/source/libs/transport/test/svrBench.c @@ -67,7 +67,7 @@ void *processShellMsg(void *arg) { int type; SQueueInfo qinfo = {0}; - qall = taosAllocateQall(); + taosAllocateQall(&qall); while (1) { int numOfMsgs = taosReadAllQitemsFromQset(multiQ->qset[idx], qall, &qinfo); @@ -129,7 +129,7 @@ void *processShellMsg(void *arg) { void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); + taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0, (void **)&pTemp); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); int32_t idx = balance % multiQ->numOfThread; @@ -212,8 +212,8 @@ int main(int argc, char *argv[]) { multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread); for (int i = 0; i < numOfAthread; i++) { - multiQ->qhandle[i] = taosOpenQueue(); - multiQ->qset[i] = taosOpenQset(); + taosOpenQueue(&multiQ->qhandle[i]); + taosOpenQset(&multiQ->qset[i]); taosAddIntoQset(multiQ->qset[i], multiQ->qhandle[i], NULL); } TThread *threads = taosMemoryMalloc(sizeof(TThread) * numOfAthread); From d8e3279ca65304e1279498234b06785494709f64 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 17 Jul 2024 08:58:59 +0800 Subject: [PATCH 3/5] fix: little fix --- source/dnode/vnode/src/tq/tqUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 16c523a50e..4ecab9e400 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -572,7 +572,7 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void* taosArrayDestroy(pRes->uidList); if (type == 0) { - code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock); + code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, (void**)&pRefBlock); if (code) { blockDataCleanup(pDelBlock); taosMemoryFree(pDelBlock); From 53d917cf897c9656549c971bb6ddef630455e128 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 17 Jul 2024 09:57:13 +0800 Subject: [PATCH 4/5] fix CI problem --- source/client/src/clientTmq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 1ea03e15a6..c363686343 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -741,7 +741,7 @@ static void generateTimedTask(int64_t refId, int32_t type) { if (tmq == NULL) return; code = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0, (void**)&pTaskType); - if (code) { + if (code == TSDB_CODE_SUCCESS) { *pTaskType = type; if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) { tsem2_post(&tmq->rspSem); From fdd866737d00c7534a9af37d9191626d6187fd3d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 17 Jul 2024 13:17:40 +0800 Subject: [PATCH 5/5] fix more --- source/dnode/vnode/src/tq/tqUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 4ecab9e400..16c523a50e 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -572,7 +572,7 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void* taosArrayDestroy(pRes->uidList); if (type == 0) { - code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, (void**)&pRefBlock); + code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock); if (code) { blockDataCleanup(pDelBlock); taosMemoryFree(pDelBlock);