Optimizing Container Synchronization – DZone – Uplaza

Environment friendly information synchronization is essential in high-performance computing and multi-threaded purposes. This text explores an optimization method for eventualities the place frequent writes to a container happen in a multi-threaded setting. We’ll study the challenges of conventional synchronization strategies and current a sophisticated strategy that considerably improves efficiency for write-heavy environments. The tactic in query is useful as a result of it’s simple to implement and versatile, in contrast to pre-optimized containers which may be platform-specific, require particular information sorts, or carry extra library dependencies.

Conventional Approaches and Their Limitations

Think about a situation the place we’ve got a cache of consumer transactions:

struct TransactionData
{
    lengthy transactionId;
    lengthy userId;
    unsigned lengthy date;
    double quantity;
    int sort;
    std::string description;
};

std::map> transactionCache; // key - userId

In a multi-threaded setting, we have to synchronize entry to this cache. The standard strategy may contain utilizing a mutex:

class SimpleSynchronizedCache
{
public:
    void write(const TransactionData&& transaction)
    {
        std::lock_guard lock(cacheMutex);
        transactionCache[transaction.userId].push_back(transaction);
    }

    std::vector learn(const lengthy&& userId)
    {
        std::lock_guard lock(cacheMutex);
        strive
        {
            return transactionCache.at(userId);
        }
        catch (const std::out_of_range& ex)
        {
            return std::vector();
        }
    }

std::vector pop(const lengthy& userId)
    {
        std::lock_guard lock(_cacheMutex);
        auto userNode = _transactionCache.extract(userId);
        return userNode.empty() ? std::vector() : std::transfer(userNode.mapped());
    }

non-public:
    std::map> transactionCache;
    std::mutex cacheMutex;
};

As system load will increase, particularly with frequent reads, we’d think about using a shared_mutex:

class CacheWithSharedMutex
{
public:
    void write(const TransactionData&& transaction)
    {
        std::lock_guard lock(cacheMutex);
        transactionCache[transaction.userId].push_back(transaction);
    }

    std::vector learn(const lengthy&& userId)
    {
        std::shared_lock lock(cacheMutex);
        strive
        {
            return transactionCache.at(userId);
        }
        catch (const std::out_of_range& ex)
        {
            return std::vector();
        }
    }

std::vector pop(const lengthy& userId)
    {
        std::lock_guard lock(_cacheMutex);
        auto userNode = _transactionCache.extract(userId);
        return userNode.empty() ? std::vector() : std::transfer(userNode.mapped());
    }

non-public:
    std::map> transactionCache;
    std::shared_mutex cacheMutex;
};

Nevertheless, when the load is primarily generated by writes moderately than reads, the benefit of a shared_mutex over an everyday mutex turns into minimal. The lock will typically be acquired solely, negating the advantages of shared entry.

Furthermore, let’s think about that we don’t use learn() in any respect — as an alternative, we ceaselessly write incoming transactions and periodically flush the amassed transaction vectors utilizing pop(). As pop() includes studying with extraction, each write() and pop() operations would modify the cache, necessitating unique entry moderately than shared entry. Thus, the shared_lock turns into solely ineffective by way of optimization over an everyday mutex, and possibly even performs worse — its extra intricate implementation is now used for a similar unique locks {that a} sooner common mutex supplies. Clearly, we’d like one thing else.

Optimizing Synchronization With the Sharding Strategy

Given the next circumstances:

  1. A multi-threaded setting with a shared container
  2. Frequent modification of the container from totally different threads
  3. Objects within the container will be divided for parallel processing by some member variable.

Relating to level 3, in our cache, transactions from totally different customers will be processed independently. Whereas making a mutex for every consumer may appear very best, it might result in extreme overhead in sustaining so many locks. As a substitute, we will divide our cache into a hard and fast variety of chunks based mostly on the consumer ID, in a course of referred to as sharding. This strategy reduces the overhead and but permits the parallel processing, thereby optimizing efficiency in a multi-threaded setting.

class ShardedCache
{
public:
    ShardedCache(size_t shardSize):
        _shardSize(shardSize),
        _transactionCaches(shardSize)
    {
        std::generate(
            _transactionCaches.start(),
            _transactionCaches.finish(),
            []() { return std::make_unique(); });
    }

    void write(const TransactionData& transaction)
    {
        _transactionCaches[transaction.userId % _shardSize]->write(transaction);
    }

    std::vector learn(const lengthy& userId)
    {
        _transactionCaches[userId % _shardSize]->learn(userId);
    }

    std::vector pop(const lengthy& userId)
    {
        return std::transfer(_transactionCaches[userId % _shardSize]->pop(userId));
    }

non-public:
    const size_t _shardSize;
    std::vector> _transactionCaches;
};

This strategy permits for finer-grained locking with out the overhead of sustaining an extreme variety of mutexes. The division will be adjusted based mostly on system structure specifics, resembling measurement of a thread pool that works with the cache, or {hardware} concurrency.

Let’s run exams the place we verify how sharding accelerates cache efficiency by testing totally different partition sizes.

Efficiency Comparability

In these exams, we goal to do extra than simply measure the utmost variety of operations the processor can deal with. We need to observe how the cache behaves beneath circumstances that intently resemble real-world eventualities, the place transactions happen randomly. Our optimization objective is to reduce the processing time for these transactions, which boosts system responsiveness in sensible purposes.

The implementation and exams can be found within the GitHub repository.

#embody 
#embody 
#embody 
#embody 
#embody 
#embody 
#embody 
#embody 

#embody "SynchronizedContainers.h"

const auto hardware_concurrency = (size_t)std::thread::hardware_concurrency();

class TaskPool
{
public:
    template 
    TaskPool(size_t poolSize, Callable process)
    {
        for (auto i = 0; i  _workers;
};

template 
class Take a look at
{
public:
    template 
    Take a look at(const int testrunsNum, const size_t writeWorkersNum, const size_t popWorkersNum,
        const std::string& resultsFile, CacheArgs&& ... cacheArgs) :
        _cache(std::ahead(cacheArgs)...),
        _writeWorkersNum(writeWorkersNum), _popWorkersNum(popWorkersNum),
        _resultsFile(resultsFile),
        _testrunsNum(testrunsNum), _testStarted (false)
    {
        std::random_device rd;
        _randomGenerator = std::mt19937(rd());
    }


    void run()
    {
        for (auto i = 0; i  lock(_testStartSync);
            _testStarted = false;
        }

        // these swimming pools received’t simply fireplace as many operations as they'll,
        // however will emulate real-time occuring requests to the cache in multithreaded setting
        auto writeTestPool = TaskPool(_writeWorkersNum, std::bind(&Take a look at::writeTransactions, this));
        auto popTestPool = TaskPool(_popWorkersNum, std::bind(&Take a look at::popTransactions, this));

        _writeTime = 0;
        _writeOpNum = 0;
        _popTime = 0;
        _popOpNum = 0;

        {
            std::lock_guard lock(_testStartSync);
            _testStarted = true;
            _testStartCv.notify_all();
        }
    }

    void logResults()
    {
        std::cout  lock(_testStartSync);
            _testStartCv.wait(lock, [this] { return _testStarted; });
        }
        std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();

        // hypothetical system has round 100k at the moment lively customers
        std::uniform_int_distribution userDistribution(1, 100000);

        // delay as much as 5 ms for each thread to not begin concurrently
        std::uniform_int_distribution waitTimeDistribution(0, 5000);
        std::this_thread::sleep_for(std::chrono::microseconds(waitTimeDistribution(_randomGenerator)));

        for (
            auto iterationStart = std::chrono::steady_clock::now();
            iterationStart - begin (operationEnd - operationStart).rely();

            // make span between iterations no less than 5ms
            std::this_thread::sleep_for(iterationStart + std::chrono::milliseconds(5) - std::chrono::steady_clock::now());
        }
    }

    void popTransactions()
    {
        {
            std::unique_lock lock(_testStartSync);
            _testStartCv.wait(lock, [this] { return _testStarted; });
        }
        std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();

        // hypothetical system has round 100k at the moment lively customers
        std::uniform_int_distribution userDistribution(1, 100000);

        // delay as much as 100 ms for each thread to not begin concurrently
        std::uniform_int_distribution waitTimeDistribution(0, 100000);
        std::this_thread::sleep_for(std::chrono::microseconds(waitTimeDistribution(_randomGenerator)));

        for (
            auto iterationStart = std::chrono::steady_clock::now();
            iterationStart - begin (operationEnd - operationStart).rely();

            // make span between iterations no less than 100ms
            std::this_thread::sleep_for(iterationStart + std::chrono::milliseconds(100) - std::chrono::steady_clock::now());
        }
    }

    CacheImpl _cache;

    std::atomic _writeTime;
    std::atomic _writeOpNum;
    std::atomic _popTime;
    std::atomic _popOpNum;

    size_t _writeWorkersNum;
    size_t _popWorkersNum;
    std::string _resultsFile;
    int _testrunsNum;
    bool _testStarted;
    std::mutex _testStartSync;
    std::condition_variable _testStartCv;
    std::mt19937 _randomGenerator;
};

void testCaches(const size_t testedShardSize, const size_t workersNum)
{
    if (testedShardSize == 1)
    {
        auto simpleImplTest = Take a look at(
            10, workersNum, workersNum, "simple_cache_tests(" + std::to_string(workersNum) + "_workers).csv");

        simpleImplTest.run();
    }
    else
    {
        auto shardedImpl4Test = Take a look at(
            10, workersNum, workersNum, "sharded_cache_" + std::to_string(testedShardSize) + "_tests(" + std::to_string(workersNum) + "_workers).csv", 4);

        shardedImpl4Test.run();
    }
}

int fundamental()
{
    std::cout  testPlan = { 1, 4, 8, 32, 128, 4096, 100000 };

    for (auto i = 0; i  additionalTestPlan = { 1, 8, 128, 100000 };

    for (auto i = 0; i 

We observe that with 2,000 writes and 300 pops per second (with a concurrency of 8) — which aren’t very excessive numbers for a high-load system — optimization utilizing sharding considerably accelerates cache efficiency, by orders of magnitude. Nevertheless, evaluating the importance of this distinction is left to the reader, as, in each eventualities, operations took lower than a millisecond. It’s essential to notice that the exams used a comparatively light-weight information construction for transactions, and synchronization was utilized solely to the container itself. In real-world eventualities, information is usually extra complicated and bigger, and synchronized processing might require extra computations and entry to different information, which may considerably improve the time of operation itself. Subsequently, we goal to spend as little time on synchronization as potential.

The exams don’t present the numerous distinction in processing time when growing the shard measurement. The larger the dimensions the larger is the sustaining overhead, so how low ought to we go? I believe that the minimal efficient worth is tied to the system’s concurrency, so for contemporary server machines with a lot larger concurrency than my dwelling PC, a shard measurement that’s too small received’t yield probably the most optimum outcomes. I’d like to see the outcomes on different machines with totally different concurrency which will verify or disprove this speculation, however for now I assume it’s optimum to make use of a shard measurement that’s a number of instances bigger than the concurrency. You too can notice that the biggest measurement examined — 100,000 — successfully matches the talked about earlier strategy of assigning a mutex to every consumer (within the exams, consumer IDs have been generated throughout the vary of 100,000). As will be seen, this didn’t present any benefit in processing velocity, and this strategy is clearly extra demanding by way of reminiscence.

Limitations and Concerns

So, we decided an optimum shard measurement, however this isn’t the one factor that ought to be thought-about for one of the best outcomes.

It’s essential to keep in mind that such a distinction in comparison with a easy implementation exists solely as a result of we try to carry out a sufficiently giant variety of transactions on the similar time, inflicting a “queue” to construct up. If the system’s concurrency and the velocity of every operation (throughout the mutex lock) permit operations to be processed with out bottlenecks, the effectiveness of sharding optimization decreases. To show this, let’s have a look at the check outcomes with decreased load — at 500 writes and 75 pops (with a concurrency of 8) — the distinction remains to be current, however it’s not as important. That is one more reminder that untimely optimizations can complicate code with out considerably impacting outcomes. It’s essential to know the applying necessities and anticipated load.

Additionally, it’s essential to notice that the effectiveness of sharding closely is determined by the distribution of values of the chosen key (on this case, consumer ID). If the distribution turns into closely skewed, we might revert to efficiency extra much like that of a single mutex — think about all the transactions coming from a single consumer.

Conclusion

In eventualities with frequent writes to a container in a multi-threaded setting, conventional synchronization strategies can turn out to be a bottleneck. By leveraging the power of parallel processing of knowledge and predictable distribution by some particular key and implementing a sharded synchronization strategy, we will considerably enhance efficiency with out sacrificing thread security. This method can show itself efficient for methods coping with user-specific information, resembling transaction processing methods, consumer session caches, or any situation the place information will be logically partitioned based mostly on a key attribute.

As with every optimization, it’s essential to profile your particular use case and alter the implementation accordingly. The strategy offered right here supplies a place to begin for tackling synchronization challenges in write-heavy, multi-threaded purposes.

Bear in mind, the objective of optimization is not only to make issues sooner, however to make them extra environment friendly and scalable. By considering critically about your information entry patterns and leveraging the inherent construction of your information, you’ll be able to typically discover modern options to efficiency bottlenecks.

Share This Article
Leave a comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Exit mobile version