Anomaly Detection using Sigma Rules (Part 5) Flux Capacitor Optimization

To boost performance, we implement a forgetful bloom filter and a custom Spark state store providerPhoto by Leora Winter on Unsplash, Shippagan, NB, CanadaThis is the 5th article of our series. Refer to part 1 , part 2, part 3 and part 4 for some context.In our previous articles, we have demonstrated the performance gains achieved by using a bloom filter. We also showed how we leveraged a bloom filter to implement temporal proximity correlations, parent/child and ancestor relationships.So far we have been using a single bloom per host. Eventually the bloom filter will be saturated with tags and will issue a lot of false positive. Using this online bloom filter calculator, we can see the probability of getting a false positive. Notice how the false positive rate quickly increases passed 200,000 tags. (This graph is for n=200,000 and p=1%)Image by AuthorForgetful Bloom FilterWhat we need is a way to age off very old tags. We need a forgetful bloom filter. As explained in this excellent paper from Redis Labs Age-Partitioned Bloom Filter, there are many ways to achieve a forgetful bloom filter. We will use the most basic approach:Segmentation based approaches use several disjoint segments which can be individually added and retired. The most naïf and several times mentioned approach is a sequence of plain BFs, one per generation, adding a new one and retiring the oldest when the one in use gets full.We chose to use 10 generations. Thus we use 10 bloom filter per host. Each bloom is capable of holding up to 20,000 tags.We use an “active” bloom to insert new tags. When the “active” bloom is full, we create a new one. When we reach 10 blooms, we discard the oldest bloom.We query for tags by testing the “active” bloom. If the tag is not found we test the next (older) bloom until we reach the end.Notice that for every tag we want to test, we can potentially perform 10 tests in 10 different blooms. Each tests has a certain probability of reporting a false positive. So by using 10 blooms, we increase our chances by 10. To reduce the chances of getting false positive, we use blooms with a ffp of 1/1000 rather than 1/100. In fact, we will show we can even use ffp of 1/10000In order to accommodate multiple blooms, we will no longer store a bloom object in the state store:val stateEncoder = Encoders.javaSerialization(BloomFilter.class)Rather, we will persists an FluxState object holding a list of bloom filters:val stateEncoder = Encoders.product[FluxState]The FluxState has the following fields:case class FluxState( var version: Long = 0, var active: Int = 0, var serializedBlooms: List[Array[Byte]] = List()) extends TagCache {For performance reasons, we serialize the bloom filters ourselves. Since we know the size of these objects, we can optimize the serialization by pre-allocating the serialization buffers. The serializedBlooms field holds the serialized blooms. The active field keeps track of the index of the active bloom within this list. We will explain the use of the version number a bit later. This is how we serialize the blooms:val padding = 4000val n = tagCapacity / NUM_BLOOMS// Formula taken from https://hur.st/bloomfilter// m = ceil( (n * log(p)) / log(1 / pow(2, log(2))))val mBits = Math.ceil( (n * Math.log(desiredFpp)) / Math.log(1 / Math.pow(2, Math.log(2))))val numBytes = (mBits / 8).toInt + padding val byteArrayOut = new ByteArrayOutputStream(numBytes)val store = new ObjectOutputStream(byteArrayOut)store.writeObject(bloom)store.closebyteArrayOut.toByteArray()Efficient CheckpointingWe segmented our large bloom into 10 smaller ones. Due to the nature of bloom filters, the space used by 10 blooms of 20,000 tags is roughly the same as a larger 200,000 tag bloom, roughly 200KiB.The Spark HDFS state store provider keeps all the FluxState objects in memory. If we suppose a fleet of 50,000 hosts, this results in about 10GiB of RAM. In fact, the memory usage of the HDFS state store is measured to be 25GiB.Image by AuthorThe reason why it’s much higher is that the HDFS state store keeps 2 copies of the states by default. We can changed it to store a single copy using spark.sql.streaming.maxBatchesToRetainInMemory. This brings down memory usage to about 12GiB of RAM, which corresponds to our estimate.As part of checkpointing, Spark writes out all the states to the data lake and it does this after every micro-batch completes. Spark spends a lot of time persisting 12 GiB of state and does so over and over.However, during every micro-batch, we only modify 1 out of 10 blooms (the active bloom). The other 9 blooms might be queried but remain unchanged. The default HDFS state store provider is unaware of which bloom is changed, it simply persists the FluxState object. If the state store provider knew which bloom is the active bloom, it could be more efficient and only checkpoint the modified active bloom. It could potentially cut down serialization to 1/10th of the 12GiB.Custom State Store ProviderThe HDFSBa

Anomaly Detection using Sigma Rules (Part 5) Flux Capacitor Optimization