The Top Anomaly Detection Techniques You Need to Know

The Top Anomaly Detection Techniques You Need to Know

In today’s data-driven world, organizations constantly collect vast amounts of data. However, not all data is created equal, and datasets may contain unexpected or unusual patterns or data points, known as anomalies or outliers. These anomalies can provide valuable insights into data quality, potential risks, and operational efficiency. However, detecting anomalies can be challenging, especially when dealing with large and complex datasets.

This is where anomaly detection techniques come into play. Anomaly detection is the process of intelligently identifying unusual or unexpected patterns or data points in a dataset. By leveraging the top anomaly detection techniques, organizations can gain insights into their data, prevent fraud, and improve their model performance. Whether you are working in finance, cybersecurity, manufacturing, or healthcare, anomaly detection is a crucial tool that can help you gain insights into your data and make better decisions.

What is Anomaly Detection and Its Role in Data Quality?

Anomaly detection refers to identifying unusual or unexpected patterns or data points in a dataset. It plays a crucial role in ensuring data quality. Data quality refers to the accuracy, completeness, and consistency of data. Poor data quality can lead to wasted resources since models must be re-trained on new and clean data.

To learn about the 5 most essential data cleaning techniques, follow this article.

Anomaly detection can help identify potential errors or inconsistencies in data, allowing organizations to address these issues and improve data quality. For example, anomaly detection is used in the finance industry to identify unusual or unexpected patterns in financial transactions. These patterns indicate potential fraud, which can be addressed to improve data quality and protect financial assets.

Top Anomaly Detection Techniques

Several different methods exist for detecting anomalies in a dataset utilizing different features. Let’s dive deeper into the following five most popular techniques for anomaly detection..

Statistical Methods

Statistical methods for anomaly detection are based on identifying data points that deviate from expected statistical distributions or patterns. These methods are often simple to implement and can be useful when the dataset is small or when the data is expected to follow a specific statistical distribution. Some common statistical methods for anomaly detection include the percentile and interquartile range (IQR) methods.

Percentile method

The percentile method is based on identifying data points that fall outside a specific percentile range. This method considers data points that fall outside the specified percentile range anomalies.

Here’s an example code snippet demonstrating the percentile anomaly detection method using Python.

import numpy as np

# Generate a random dataset
data = np.random.normal(0, 1, 1000)

# Define the percentile range for anomaly detection
percentile_range = (1, 99)

# Identify the values at the specified percentiles
percentiles = np.percentile(data, percentile_range)

# Identify anomalies
anomalies = np.where((data < percentiles[0]) | (data > percentiles[1]))

# Print the indices of the anomalies
print("Anomalies:", anomalies)

Interquartile range (IQR) method

The interquartile range (IQR) method is based on the range between the first and third quartiles of the dataset. In this method, data points that fall outside a certain IQR range are considered anomalies.

In Python, this is implemented as the following.

import numpy as np

# Generate a random dataset
data = np.random.normal(0, 1, 1000)

# Calculate the first and third quartiles of the dataset
q1, q3 = np.percentile(data, [25, 75])

# Calculate the interquartile range of the dataset
iqr = q3 - q1

# Define the IQR range for anomaly detection
iqr_range = (q1 - 1.5 * iqr, q3 + 1.5 * iqr)

# Identify anomalies
anomalies = np.where((data < iqr_range[0]) | (data > iqr_range[1]))

# Print the indices of the anomalies
print("Anomalies:", anomalies)

Clustering-Based Methods

Clustering-based methods for anomaly detection involve grouping similar data points into clusters and then identifying data points that are not part of any cluster or belong to a cluster significantly different from the others. These methods work well for detecting local anomalies which occur in a specific region of the dataset. Clustering-based methods can also identify global anomalies which occur across the entire dataset.

K-Means Clustering

One popular clustering-based method for anomaly detection is the k-means clustering algorithm. The k-means algorithm groups data points into “k” clusters based on their similarity, which can be calculated by Euclidean distance in the feature space or other similar methods. Data points that do not fit well into any of the clusters or belong to a cluster with a significantly different distribution are considered anomalies.

Here is an example code snippet demonstrating the k-means clustering method for anomaly detection.

import numpy as np
from sklearn.cluster import KMeans

# Generate a random dataset
data = np.random.normal(0, 1, (1000, 2))

# Fit the k-means algorithm to the dataset
kmeans = KMeans(n_clusters=5).fit(data)

# Get the distances of each point to its nearest cluster
distances = kmeans.transform(data)
nearest_distances = np.min(distances, axis=1)

# Define a threshold for anomaly detection
threshold = np.percentile(nearest_distances, 95)

# Identify anomalies
anomalies = np.where(nearest_distances > threshold)

# Print the indices of the anomalies
print("Anomalies:", anomalies)

DBSCAN Clustering

Another clustering-based method for anomaly detection is the density-based spatial clustering of applications with noise (DBSCAN) algorithm. The DBSCAN algorithm groups data points into clusters based on their density. Data points not part of any cluster or belonging to a cluster with a significantly different density are considered anomalies.

import numpy as np
from sklearn.cluster import DBSCAN

# Generate a random dataset
data = np.random.normal(0, 1, (1000, 2))

# Fit the DBSCAN algorithm to the dataset
dbscan = DBSCAN(eps=0.5, min_samples=5).fit(data)

# Identify anomalies
anomalies = np.where(dbscan.labels_ == -1)

# Print the indices of the anomalies
print("Anomalies:", anomalies)

Machine Learning-Based Methods

Machine learning-based methods for anomaly detection involve using supervised or unsupervised machine learning algorithms to automatically identify anomalies in data without solely relying on traditional data statistics. These methods typically require a training dataset with both normal and anomalous data points to build a model that can identify anomalies in new data.

Supervised Methods

Supervised machine learning algorithms are trained on a labeled dataset that includes both normal and anomalous data points. The algorithm learns to classify new data points as either normal or anomalous based on the features of the data. The performance of the algorithm can be evaluated using metrics such as accuracy, precision, recall, and F1-score.

Support Vector Machines

SVM is a popular supervised machine learning algorithm for anomaly detection. They are effective in separating data points into two classes based on the features of the data. The algorithm learns to draw a boundary between the normal and anomalous data points in the feature space. Data points that fall outside this boundary are classified as anomalies.

import numpy as np
from sklearn.svm import OneClassSVM

# Generate a random dataset
data = np.random.normal(0, 1, (1000, 2))

# Train an One-Class SVM on the dataset
svm = OneClassSVM(gamma='auto').fit(data)

# Predict the anomaly scores for each data point
scores = svm.score_samples(data)

# Define a threshold for anomaly detection
threshold = np.percentile(scores, 5)

# Identify anomalies
anomalies = np.where(scores < threshold)

# Print the indices of the anomalies
print("Anomalies:", anomalies)

Confidence Learning

Confidence learning is a technique in machine learning where a model is trained to predict not only the class or label of a given input but also the confidence or certainty of its prediction. In other words, the model learns to assign a probability or score to each possible class, indicating how confident it is in its prediction.

This is particularly useful in anomaly detection, where identifying rare events or outliers is crucial. By using confidence learning, we can train a model to not only detect anomalies but also to estimate the likelihood or probability that a given input is anomalous.

Here is an example of implementing confidence learning in PyTorch.

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

class AnomalyDetector(nn.Module):
    def __init__(self):
        super(AnomalyDetector, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, 8),
            nn.ReLU(),
            nn.Linear(8, 4),
        )
        self.decoder = nn.Sequential(
            nn.Linear(4, 8),
            nn.ReLU(),
            nn.Linear(8, 16),
            nn.ReLU(),
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 28*28),
            nn.Sigmoid(),
        )
        self.criterion = nn.MSELoss()
   
    def forward(self, x):
        z = self.encoder(x)
        x_hat = self.decoder(z)
        return x_hat, z
   
    def training_step(self, batch):
        x, _ = batch
        x_hat, z = self(x)
        loss = self.criterion(x_hat, x)
        return loss
   
    def validation_step(self, batch):
        x, _ = batch
        x_hat, z = self(x)
        loss = self.criterion(x_hat, x)
        return {'val_loss': loss}
   
    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        return {'val_loss': avg_loss}
   
    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=1e-3)
        return optimizer

class ConfidenceAnomalyDetector(AnomalyDetector):
    def __init__(self):
        super(ConfidenceAnomalyDetector, self).__init__()
        self.confidence_layer = nn.Sequential(
            nn.Linear(4, 2),
            nn.Softmax(dim=1)
        )
   
    def forward(self, x):
        z = self.encoder(x)
        x_hat = self.decoder(z)
        confidence = self.confidence_layer(z)
        return x_hat, z, confidence

class AnomalyDataset(Dataset):
    def __init__(self, X):
        self.X = torch.Tensor(X)
   
    def __len__(self):
        return len(self.X)
   
    def __getitem__(self, idx):
        x = self.X[idx]
        return x, 0

# train the model with confidence learning
model = ConfidenceAnomalyDetector()
train_dataset = AnomalyDataset(X_train)
train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_dataset = AnomalyDataset(X_val)
val_dataloader = DataLoader(val_dataset, batch_size=128)
trainer = pl.Trainer(max_epochs=100, progress_bar_refresh_rate=10, gpus=1)
trainer.fit(model, train_dataloader, val_dataloader)

# detect anomalies with confidence scores
test_dataset = AnomalyDataset(X_test)
test_dataloader = DataLoader(test
_dataset, batch_size=128)
anomalies = []
for x in test_dataloader:
    score, is_anomaly = model(x.to(device))
    anomalies.extend(x[~is_anomaly.cpu().numpy()])

#plot the anomalies
anomalies = torch.stack(anomalies).cpu().numpy()
plt.plot(X_test, label='normal')
plt.scatter(np.arange(len(anomalies)), anomalies, label='anomaly', color='red')
plt.legend()
plt.show()

Unsupervised Methods

Unsupervised machine learning-based methods for anomaly detection involve training models on a dataset without any labels indicating the presence of anomalies. These models can then detect anomalies in new, unseen data.

One of the advantages of unsupervised machine learning-based methods for anomaly detection is that they can be used to detect previously unseen anomalies that were not present in the training data. However, because they do not use labeled data, it can be challenging to determine the severity or importance of the detected anomalies. One popular unsupervised method for anomaly detection is the isolation forest algorithm, which uses decision trees to isolate anomalies from the rest of the data.

Here’s an example of how to implement the isolation forest algorithm in Python using the scikit-learn library.

from sklearn.ensemble import IsolationForest
import numpy as np

# Generate some random data
X = np.random.normal(0, 0.1, size=(1000, 2))
X[:10] = np.random.normal(5, 0.5, size=(10, 2))

# Create and fit the isolation forest model
clf = IsolationForest(n_estimators=100, contamination=0.05, random_state=42)
clf.fit(X)

# Predict the anomalies
y_pred = clf.predict(X)

# Get the indices of the anomalies
anomaly_indices = np.where(y_pred == -1)[0]

Deep Learning-Based Methods

Deep Learning-based methods are also automatic anomaly detectors. Deep Learning itself is a subset of Machine Learning. However, Deep Learning models tend to be more complex, and they are automatic feature extractors and predictors, unlike Machine learning models that use handcrafted features.

Deep learning-based methods have shown great success in detecting anomalies in high-dimensional datasets. These methods rely on neural networks with multiple hidden layers to learn complex patterns in the data and identify anomalies based on deviations from these learned patterns. Autoencoders are famous for this.

Autoencoders are composed of two distinct parts- an encoder and a decoder. The encoder maps the input data to a lower-dimensional latent space representation, and the decoder reconstructs the original input from the latent space representation. When an anomaly is encountered, a trained autoencoder is less able to accurately reconstruct the input data, indicating the presence of an anomaly.

Here is an example of how to implement an autoencoder for anomaly detection on the MNIST dataset.

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

# Define the autoencoder architecture
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28 * 28, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 12),
            nn.ReLU(),
            nn.Linear(12, 2),
        )
        self.decoder = nn.Sequential(
            nn.Linear(2, 12),
            nn.ReLU(),
            nn.Linear(12, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 28 * 28),
            nn.Sigmoid(),
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Define the dataset class
class MNISTDataset(Dataset):
    def __init__(self, data, targets):
        self.data = data
        self.targets = targets

    def __getitem__(self, index):
        x = self.data[index].flatten().float()
        y = self.targets[index]
        return x, y

    def __len__(self):
        return len(self.data)

# Load the MNIST dataset
train_data = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=torchvision.transforms.ToTensor())
test_data = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=torchvision.transforms.ToTensor())

train_loader = DataLoader(MNISTDataset(train_data.data, train_data.targets), batch_size=64, shuffle=True)
test_loader = DataLoader(MNISTDataset(test_data.data, test_data.targets), batch_size=64, shuffle=False)

# Define the autoencoder model and optimizer
autoencoder = Autoencoder()
optimizer = optim.Adam(autoencoder.parameters(), lr=0.001)

# Train the autoencoder
num_epochs = 10
for epoch in range(num_epochs):
    for data, _ in train_loader:
        data = data.view(data.size(0), -1)
        optimizer.zero_grad()
        recon_data = autoencoder(data)
        loss = nn.BCELoss()(recon_data, data)
        loss.backward()
        optimizer.step()
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Detect anomalies in the test set
anomaly_scores = []
for data, _ in test_loader:
    data = data.view(data.size(0), -1)
    recon_data = autoencoder(data)
    loss = nn.BCELoss(reduction='none')(recon_data, data)
    loss = loss.sum(dim=1)
    anomaly_scores += loss.tolist()

# Visualize the anomaly scores
import matplotlib.pyplot as plt

plt.hist(anomaly_scores, bins=50)
plt.xlabel('Anomaly Score')
plt.ylabel('Count')
plt.show()

Time Series-Based Methods

Time series-based methods are instrumental in detecting anomalies in sequential data where the order of the data points matters- for example, audio or video data. LSTM is one of the most popular techniques for anomaly detection in time-series data.

LSTM (Long Short-Term Memory) is a specialized type of recurrent neural network (RNN) designed to model long-term dependencies in time series data. Unlike traditional RNNs, LSTM incorporates a memory cell that can store information for a prolonged period, allowing it to handle sequences with long gaps between relevant events. This makes LSTM particularly suitable for time series-based anomaly detection, as it can effectively capture the underlying patterns and dependencies in sequential data. LSTM can offer superior performance over other neural network architectures in various sequence modeling tasks by leveraging its ability to remember past information and selectively discard irrelevant inputs.

In PyTorch, an LSTM-based anomaly detector can be implemented as follows.

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

class LSTMAnomalyDetector(nn.Module):
    def __init__(self, hidden_size, num_layers, seq_len):
        super(LSTMAnomalyDetector, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.seq_len = seq_len
        self.encoder = nn.LSTM(input_size=1, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.decoder = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, 1)
        
    def forward(self, x):
        # Encode input sequence
        _, (hidden, cell) = self.encoder(x)
        
        # Generate latent representation
        latent = hidden[-1, :, :].unsqueeze(0).repeat(self.seq_len, 1, 1)
        
        # Decode latent representation
        outputs, _ = self.decoder(latent, (hidden, cell))
        
        # Pass through linear layer to get reconstructed sequence
        reconstructed = self.linear(outputs)
        return reconstructed
# Load training data
train_data = np.load('train_data.npy')

# Normalize training data
train_data = (train_data - np.mean(train_data)) / np.std(train_data)

# Set hyperparameters
hidden_size = 64
num_layers = 2
seq_len = 24
batch_size = 64
epochs = 50

# Initialize model
model = LSTMAnomalyDetector(hidden_size=hidden_size, num_layers=num_layers, seq_len=seq_len)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train model
for epoch in range(epochs):
    running_loss = 0.0
    for i in range(0, len(train_data) - seq_len, batch_size):
        # Get batch of sequences
        inputs = torch.FloatTensor(train_data[i:i+seq_len]).unsqueeze(0)
        
        # Zero the parameter gradients
        optimizer.zero_grad()


        # Forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(inputs, outputs)
        loss.backward()
        optimizer.step()


        # Print statistics
        running_loss += loss.item()
    print(f'Epoch {epoch+1}, Loss: {running_loss/len(train_data):.6f}')


# Save trained model weights
torch.save(model.state_dict(), 'lstm_anomaly_detector.pt')

import matplotlib.pyplot as plt

# Load test data
test_data = np.load('test_data.npy')

# Normalize test data
test_data = (test_data - np.mean(test_data)) / np.std(test_data)

# Initialize model and load trained weights
model = LSTMAnomalyDetector(hidden_size=hidden_size, num_layers=num_layers, seq_len=seq_len)
model.load_state_dict(torch.load('lstm_anomaly_detector.pt'))


# Set threshold for anomaly detection
threshold = 0.01


# Evaluate model on test data
anomalies = []
for i in range(len(test_data) - seq_len):
    inputs = torch.FloatTensor(test_data[i:i+24]).unsqueeze(0)
    outputs = model(inputs)
    loss = torch.mean((inputs - outputs)**2)
    if loss.item() > threshold:
        anomalies.append(i+24)
        
# Plot original and reconstructed time series with detected anomalies
plt.plot(test_data)
plt.plot(np.arange(seq_len, len(test_data)), test_data[seq_len:], 'g')
plt.scatter(anomalies, test_data[anomalies], color='r')
plt.xlabel('Time')
plt.ylabel('Normalized value')
plt.legend(['Original', 'Reconstructed', 'Anomaly'])
plt.show()

Implementing Anomaly Detection Techniques in Your Data Pipeline

Simpler methods like statistical or clustering techniques work well on small-scale datasets. However, the data available today are very high-dimensional, requiring the aid of complex Deep Learning models. Such models take a lot of time to train due to the high number of parameters.

Even if you are prepared to spend the resources to train a complex model for anomaly detection, once the anomalous samples are detected in the dataset, you need to correct the dataset and re-train the model for predictions. This is often computationally infeasible and extremely time-consuming.

Coresets offer an intelligent solution to this problem- they are a much smaller, weighted subset of the entire dataset, where the samples are chosen such that solving the problem on the Coreset will yield the same solution (like parameter values of an ML model) as solving the problem on the original dataset. The DataHeroes Coreset tree structure allows you to re-train your model on the Coreset in near real-time, thus

Say, you have a dataset with 50 million samples distributed in 5 classes. The DataHeroes library (installed in Python using pip install dataheroes) allows you to create a Coreset of this dataset very easily using the following code:

from dataheroes import CoresetTreeServiceLG

# Build the coreset tree service object
service_obj = CoresetTreeServiceLG(data_params=data_params,
                                   optimized_for='cleaning',
                                   n_classes=5,
                                   n_instances=50_000_000
                                  )
service_obj.build_from_file(train_file_path)

The DataHeroes coresets have a property called “importance” to identify anomalies in the dataset. For example, say the above 50M samples dataset consists of natural images. Suppose you have the images as shown below. It is clear that the image of the indoor horse is uncommon for that class distribution. An indoor dog or an outdoor cat are not out of the ordinary.

DataHeroes coresets

Reviewing the data samples, which obtained a very high “importance” value, can help identify potential labeling errors, which can then be corrected to rebuild the coreset. Viewing the important samples in the coreset is easily done using the following:

# Get the top “num_samples_to_view” most important samples from the class ‘horse’
result = service_obj.get_important_samples(
   		class_size={‘horse’: num_samples_to_view})

Now that you have identified the anomalous sample, you can either update their labels or remove them completely to make the coreset more reliable. You can do this easily using the following code.

#Update labels of some anomalous samples to be the ‘Dog’ class
service_obj.update_targets(indices, y=[‘Dog’] * len(indices))

#Removing the samples indices stored in the array `idxes_to_delete` from the Coreset
service_obj.remove_samples(indices=idxes_to_delete)

Follow these detailed articles to find and fix labels in image classification, object detection, semantic segmentation and NLP datasets.

Once you have fixed the sample labels or removed them, you can train your ML model on the coreset, and retrain it if necessary to optimize the model hyperparameters– all done in exponentially less time than having to deal with the whole dataset.

Conclusion

Anomaly detection can improve data quality by identifying errors and inconsistencies, improving data preprocessing, enhancing data validation, preventing fraud and misuse, and improving product and service quality. By leveraging anomaly detection techniques, organizations can gain insights into their data, prevent errors and fraud, and improve the accuracy and reliability of their data.

Whether you are working in finance, cybersecurity, manufacturing, or healthcare, anomaly detection is a crucial tool that can help you gain insights into your data and make better decisions. By staying up-to-date with the latest developments in anomaly detection, you can continue to improve your data analysis capabilities and gain a competitive edge in your respective industry.

Subscribe to Our Blog

Subscribe to Our Blog

Related Articles

Hyperparameter Tuning Methods Every Data Scientist Should Know

Hyperparameter Tuning Methods Every Data Scientist Should Know

Learn More
Unleashing the Power of ML: The Art of Training Models and Its Vital Significance

Unleashing the Power of ML: The Art of Training Models and Its Vital Significance

Learn More
Comparing Customer Segmentation Techniques: KMeans vs. KMeans Coreset from DataHeroes

Comparing Customer Segmentation Techniques: KMeans vs. KMeans Coreset from DataHeroes

Learn More