Mettre en place un système de Caching automatique avec MongoDB et Redis
Pour mettre en place un système événementiel, il faut bien sûr des événements 😂. Dans une approche sans événements, si on aimerais faire du caching, on devrais mettre nos opérations ensemble (opération dans la db primaire + opération dans la cache) ou utiliser un worker avec une queue de messagerie.
Dans ce post, nous allons mettre en place un système de caching avec MongoDB comme base de données principale et Redis comme base de données de cache. La partie la plus intéressante est que nous allons utiliser les événements renvoyés par MongoDB à chaque fois qu’une opération est faite.
Nous utiliserons NestJS comme framework, mais le code sera assez simple pour être compréhensible avec ou sans des connaissances préliminaires du framework.
Cet article est un simple aperçu. Nous explorerons le sujet de ChangeStream plus en profondeur plus tard.
Comme plusieurs autres bases de données, MongoDB offre un ensemble d’événements qui se lancent quand des opérations se font dans la base de données.
Ces événements retournent des informations en rapport avec l’opération effectué qui peuvent être utilisé pour faire d’autres taches comme du caching.
Voici une liste des types d’événements disponible
L’observation des flux de changement est une opération qui peut être interrompue ou reprise (on en discutera dans un autre article.).
Notre application est un simple API de CRUD. Cette application est la même utilisé dans l’article sur les tests intégrations avec TestContainers.
Les flux de modifications ne s'exécutent que dans des réplicas ou sharded cluster.
Pour cela, nous allons transformer notre base de données en réplica avec un seul nœud.
Dans MongoDB, un jeu de répliques est une collection de serveurs. Le serveur principal est l'un de ces ordinateurs. Les serveurs secondaires sont les serveurs restants. Les opérations d'écriture sont gérées par le serveur primaire. Les actions sont également appliquées sur les serveurs secondaires. En d'autres termes, les données sont répliquées sur le serveur principal. L'objectif principal de la réplication est d'assurer la redondance et la haute disponibilité des données.
Nous allons trouver le fichier de configuration de mongodb sur notre machine et la modifié. Sur Linux, il se trouve sous /etc/mongodb.conf
1- Ajoutez les détails du jeu de réplicas suivants au fichier mongodb.conf
replication:
replSetName: "<nom du jeu de répliques>"
2- Lancer le jeu de répliques
Pour cela, il suffit de se connecter a mongodb en ligne de commande et taper la commande
rs.initiate()
Le résultat ressemblera à ceci
> rs.initiate()
{
"info2" : "no configuration specified. Using a default configuration for the set",
"me" : "127.0.0.1:27017",
"ok" : 1
}
Et c’est bon.
À partir de là, il ne nous reste plus qu'à écouter la collection qui nous intéresse et à réagir aux événements. Nous voulons faire les choses suivantes selon les événements :
Nous allons commencer par regarder la collection quote
this.changeStream = quotesRepository.getModel().collection.watch();
Maintenant que nous avons le flux de modifications, nous pouvons écouter les modifications et réagir en fonction des types d'opérations.
this.changeStream.on('change', async (event) => {
const documentId = event.documentKey._id.toString();
switch (event.operationType) {
case 'insert':
await this.onDocumentInserted(documentId, event.fullDocument);
break;
case 'update':
await this.onDocumentUpdated(documentId, event.updateDescription);
break;
case 'delete':
await this.onDocumentDeleted(documentId);
break;
default:
this.logger.warn('Event listener operation not supported');
}
});
Voici les opérations à chaque événements:
Insertion
async onDocumentInserted(documentId: string, document: any) {
const doc = this.quotesMapper.quoteDocumentToQuote(document);
try {
await this.redisRepository.add(doc);
} catch (e) {
this.logger.error('Cache insertion error', e);
}
}
Mise à jour
async onDocumentUpdated(documentId: string, updatedDocument: UpdateDescription<QuoteDocument>) {
try {
const oldDocument = await this.redisRepository.get(documentId);
if (!oldDocument) throw new Error('Document not found');
const quote: Quote = {
...oldDocument,
...updatedDocument.updatedFields,
};
await this.redisRepository.update(quote);
} catch (e) {
this.logger.error('Cache update error', e);
}
}
Suppression
async onDocumentDeleted(documentId: string) {
try {
await this.redisRepository.delete(documentId);
} catch (e) {
this.logger.error('Cache deletion error', e);
}
}
Et voila.
Nous pouvons fermer le changestream quand nous le voulons. dans notre cas, nous le fermerons lorsque notre application sera sur le point d'être fermée.
@Injectable()
export class QuotesEventsListenerService {
private readonly changeStream: ChangeStream;
....
async closeChangeStream() {
await this.changeStream.close();
}
}
export class QuotesModule implements OnApplicationShutdown {
constructor(private readonly listenerService: QuotesEventsListenerService) {}
async onApplicationShutdown(signal?: string): any {
try {
await this.listenerService.closeChangeStream();
} catch (e) {}
}
}