Transactions
Chapter 29: Transactions & Concurrency
Section titled “Chapter 29: Transactions & Concurrency”Managing Data Integrity with Transactions
Section titled “Managing Data Integrity with Transactions”29.1 Transaction Overview
Section titled “29.1 Transaction Overview”Transactions ensure that multiple database operations are executed as a single atomic unit.
Transaction ACID Properties ================================================================================
+------------------+--------------------------------------------------+ | Property | Description | +------------------+--------------------------------------------------+ | Atomicity | All operations succeed or all fail | | Consistency | Database remains in valid state | | Isolation | Concurrent transactions don't interfere | | Durability | Committed changes are permanent | +------------------+--------------------------------------------------+
Transaction Flow:
BEGIN TRANSACTION | v +------------------+ | Operation 1 | +------------------+ | v +------------------+ | Operation 2 | +------------------+ | v +------------------+ | Operation 3 | +------------------+ | v +--------+ +--------+ | COMMIT | or | ROLLBACK| +--------+ +--------+
================================================================================29.2 Transaction Using DataSource
Section titled “29.2 Transaction Using DataSource”Basic Transaction
Section titled “Basic Transaction”import { Injectable, InjectDataSource } from '@nestjs/common';import { DataSource } from 'typeorm';import { User } from './user.entity';import { Profile } from '../profiles/profile.entity';
@Injectable()export class UsersService { constructor( @InjectDataSource() private dataSource: DataSource, ) {}
async createUserWithProfile( userData: Partial<User>, profileData: Partial<Profile>, ): Promise<User> { // Start transaction return this.dataSource.transaction(async (manager) => { // Create user const user = manager.create(User, userData); await manager.save(user);
// Create profile const profile = manager.create(Profile, { ...profileData, user: user, }); await manager.save(profile);
// Return user with profile return manager.findOne(User, { where: { id: user.id }, relations: ['profile'], }); }); }}Transaction with Error Handling
Section titled “Transaction with Error Handling”async transferFunds( fromAccountId: number, toAccountId: number, amount: number,): Promise<void> { await this.dataSource.transaction(async (manager) => { // Get source account with lock const fromAccount = await manager.findOne(Account, { where: { id: fromAccountId }, lock: { mode: 'pessimistic_write' }, });
// Get destination account with lock const toAccount = await manager.findOne(Account, { where: { id: toAccountId }, lock: { mode: 'pessimistic_write' }, });
if (!fromAccount || !toAccount) { throw new NotFoundException('Account not found'); }
if (fromAccount.balance < amount) { throw new BadRequestException('Insufficient funds'); }
// Update balances fromAccount.balance -= amount; toAccount.balance += amount;
await manager.save([fromAccount, toAccount]);
// Create transaction record await manager.insert(Transaction, { fromAccountId, toAccountId, amount, type: 'transfer', }); });}29.3 Transaction Using EntityManager
Section titled “29.3 Transaction Using EntityManager”Using Transaction Manager
Section titled “Using Transaction Manager”import { Injectable } from '@nestjs/common';import { EntityManager } from 'typeorm';import { User } from './user.entity';import { Order } from '../orders/order.entity';
@Injectable()export class OrdersService { constructor( private entityManager: EntityManager, ) {}
async createOrder( userId: number, orderData: any, items: any[], ): Promise<Order> { return this.entityManager.transaction(async (manager) => { // Verify user exists const user = await manager.findOne(User, { where: { id: userId }, });
if (!user) { throw new NotFoundException('User not found'); }
// Create order const order = manager.create(Order, { ...orderData, userId, status: 'pending', }); await manager.save(order);
// Create order items const orderItems = items.map(item => ({ ...item, orderId: order.id, })); await manager.insert(OrderItem, orderItems);
// Update inventory for (const item of items) { await manager.decrement( Product, { id: item.productId }, 'stock', item.quantity, ); }
return order; }); }}29.4 Transaction Using QueryRunner
Section titled “29.4 Transaction Using QueryRunner”Manual Transaction Control
Section titled “Manual Transaction Control”import { Injectable, InjectDataSource } from '@nestjs/common';import { DataSource, QueryRunner } from 'typeorm';
@Injectable()export class PaymentsService { constructor( @InjectDataSource() private dataSource: DataSource, ) {}
async processPayment(paymentData: any): Promise<void> { // Create query runner const queryRunner = this.dataSource.createQueryRunner();
// Connect to database await queryRunner.connect();
// Start transaction await queryRunner.startTransaction();
try { // Execute operations await queryRunner.manager.insert(Payment, paymentData);
await queryRunner.manager.increment( Account, { id: paymentData.accountId }, 'balance', paymentData.amount, );
// Commit transaction await queryRunner.commitTransaction(); } catch (error) { // Rollback on error await queryRunner.rollbackTransaction(); throw error; } finally { // Release query runner await queryRunner.release(); } }}Transaction with Savepoints
Section titled “Transaction with Savepoints”async complexOperation(): Promise<void> { const queryRunner = this.dataSource.createQueryRunner(); await queryRunner.connect(); await queryRunner.startTransaction();
try { // Main operation await queryRunner.manager.insert(User, { name: 'John' });
// Create savepoint await queryRunner.startTransaction('savepoint_1');
try { // Risky operation await queryRunner.manager.insert(Profile, { bio: 'Test' });
// Release savepoint await queryRunner.commitTransaction('savepoint_1'); } catch (error) { // Rollback to savepoint await queryRunner.rollbackTransaction('savepoint_1'); // Continue with main transaction }
// More operations await queryRunner.manager.insert(Log, { action: 'user_created' });
// Commit main transaction await queryRunner.commitTransaction(); } catch (error) { await queryRunner.rollbackTransaction(); throw error; } finally { await queryRunner.release(); }}29.5 Transaction Isolation Levels
Section titled “29.5 Transaction Isolation Levels”Isolation Level Options
Section titled “Isolation Level Options” Transaction Isolation Levels ================================================================================
+-------------------+------------------------------------------+ | Level | Description | +-------------------+------------------------------------------+ | READ UNCOMMITTED| Can read uncommitted changes | | READ COMMITTED | Only read committed changes | | REPEATABLE READ| Same read returns same data | | SERIALIZABLE | Full isolation, no concurrent effects | +-------------------+------------------------------------------+
Phenomena Prevented:
+-------------------+-------+-------+-------+ | Level | Dirty | Non- | Phantom| | | Read | Repeat| Read | +-------------------+-------+-------+-------+ | READ UNCOMMITTED | No | No | No | | READ COMMITTED | Yes | No | No | | REPEATABLE READ | Yes | Yes | No | | SERIALIZABLE | Yes | Yes | Yes | +-------------------+-------+-------+-------+
================================================================================Setting Isolation Level
Section titled “Setting Isolation Level”import { IsolationLevel } from 'typeorm';
// Using DataSourceawait this.dataSource.transaction( { isolation: 'SERIALIZABLE' }, async (manager) => { // Operations with SERIALIZABLE isolation const user = await manager.findOne(User, { where: { id: 1 } }); user.balance += 100; await manager.save(user); });
// Using QueryRunnerconst queryRunner = this.dataSource.createQueryRunner();await queryRunner.connect();await queryRunner.startTransaction('SERIALIZABLE');
try { // Operations await queryRunner.commitTransaction();} catch (error) { await queryRunner.rollbackTransaction(); throw error;} finally { await queryRunner.release();}29.6 Pessimistic Locking
Section titled “29.6 Pessimistic Locking”Lock Modes
Section titled “Lock Modes” Pessimistic Lock Modes ================================================================================
+-------------------+------------------------------------------+ | Mode | Description | +-------------------+------------------------------------------+ | pessimistic_read | Shared lock, others can read | | pessimistic_write| Exclusive lock, others cannot read/write| | pessimistic_partial_write| Partial write lock | | pessimistic_write_or_fail| Lock or fail immediately | | for_no_key_update| No key update lock (PostgreSQL) | | for_key_share | Key share lock (PostgreSQL) | +-------------------+------------------------------------------+
================================================================================Using Pessimistic Locks
Section titled “Using Pessimistic Locks”// Shared lock (pessimistic_read)const user = await this.dataSource.transaction(async (manager) => { return manager.findOne(User, { where: { id: 1 }, lock: { mode: 'pessimistic_read' }, });});
// Exclusive lock (pessimistic_write)const user = await this.dataSource.transaction(async (manager) => { return manager.findOne(User, { where: { id: 1 }, lock: { mode: 'pessimistic_write' }, });});
// Lock with QueryBuilderconst user = await this.dataSource.transaction(async (manager) => { return manager .createQueryBuilder(User, 'user') .setLock('pessimistic_write') .where('user.id = :id', { id: 1 }) .getOne();});
// Lock with wait timeout (PostgreSQL)const user = await manager.findOne(User, { where: { id: 1 }, lock: { mode: 'pessimistic_write', onLocked: 'nowait', // or 'skip_locked' },});Bank Transfer Example with Locking
Section titled “Bank Transfer Example with Locking”async transferMoney( fromId: number, toId: number, amount: number,): Promise<void> { await this.dataSource.transaction(async (manager) => { // Lock both accounts in consistent order to prevent deadlock const [fromAccount, toAccount] = await Promise.all([ manager.findOne(Account, { where: { id: fromId }, lock: { mode: 'pessimistic_write' }, }), manager.findOne(Account, { where: { id: toId }, lock: { mode: 'pessimistic_write' }, }), ]);
if (!fromAccount || !toAccount) { throw new NotFoundException('Account not found'); }
if (fromAccount.balance < amount) { throw new BadRequestException('Insufficient funds'); }
// Perform transfer fromAccount.balance -= amount; toAccount.balance += amount;
await manager.save([fromAccount, toAccount]);
// Record transaction await manager.insert(Transaction, { fromAccountId: fromId, toAccountId: toId, amount, type: 'transfer', status: 'completed', }); });}29.7 Optimistic Locking
Section titled “29.7 Optimistic Locking”Using Version Column
Section titled “Using Version Column”import { Entity, PrimaryGeneratedColumn, Column, VersionColumn } from 'typeorm';
@Entity()export class Product { @PrimaryGeneratedColumn() id: number;
@Column() name: string;
@Column('decimal') price: number;
@Column({ default: 0 }) stock: number;
// Version column for optimistic locking @VersionColumn() version: number;}
// Usageasync updateStock(id: number, quantity: number): Promise<Product> { const product = await this.productRepository.findOne({ where: { id } });
if (!product) { throw new NotFoundException('Product not found'); }
product.stock -= quantity;
try { return await this.productRepository.save(product); } catch (error) { if (error.message.includes('version')) { throw new ConflictException( 'Product was modified by another transaction. Please retry.', ); } throw error; }}Optimistic Locking Flow
Section titled “Optimistic Locking Flow” Optimistic Locking Flow ================================================================================
Transaction 1 Transaction 2 | | v v Read Product (v=1) Read Product (v=1) | | v v Modify stock Modify stock | | v v Save (check v=1) Save (check v=1) | | v v Success (v=2) Fail! (v != 1) | v Retry or Error
================================================================================29.8 Deadlock Prevention
Section titled “29.8 Deadlock Prevention”Deadlock Scenarios
Section titled “Deadlock Scenarios” Deadlock Scenario ================================================================================
Transaction 1 Transaction 2 | | v v Lock Account A Lock Account B | | v v Wait for Account B Wait for Account A | | v v +-------+ +-------+ | BLOCK |<-----> DEADLOCK <--------->| BLOCK | +-------+ +-------+
Prevention: Always lock resources in consistent order
================================================================================Deadlock Prevention Strategies
Section titled “Deadlock Prevention Strategies”// Strategy 1: Lock in consistent orderasync transfer(fromId: number, toId: number, amount: number) { // Always lock lower ID first const [firstId, secondId] = [fromId, toId].sort((a, b) => a - b);
await this.dataSource.transaction(async (manager) => { // Lock in order const first = await manager.findOne(Account, { where: { id: firstId }, lock: { mode: 'pessimistic_write' }, }); const second = await manager.findOne(Account, { where: { id: secondId }, lock: { mode: 'pessimistic_write' }, });
// Perform transfer // ... });}
// Strategy 2: Use NOWAIT or SKIP LOCKEDasync processOrder(orderId: number) { await this.dataSource.transaction(async (manager) => { const order = await manager .createQueryBuilder(Order, 'order') .setLock('pessimistic_write', { onLocked: 'nowait' }) .where('order.id = :id', { id: orderId }) .getOne();
if (!order) { throw new ConflictException('Order is being processed by another transaction'); }
// Process order });}
// Strategy 3: Retry on deadlockasync withRetry<T>( operation: () => Promise<T>, maxRetries: number = 3,): Promise<T> { let lastError: Error;
for (let i = 0; i < maxRetries; i++) { try { return await operation(); } catch (error) { lastError = error;
// Check if deadlock error if (this.isDeadlockError(error) && i < maxRetries - 1) { await this.sleep(100 * (i + 1)); // Exponential backoff continue; }
throw error; } }
throw lastError;}
private isDeadlockError(error: any): boolean { // PostgreSQL deadlock error code return error.code === '40P01';}
private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms));}29.9 Complete Transaction Example
Section titled “29.9 Complete Transaction Example”import { Injectable, NotFoundException, BadRequestException, ConflictException, InjectDataSource,} from '@nestjs/common';import { DataSource, IsolationLevel } from 'typeorm';import { Order } from './order.entity';import { OrderItem } from '../order-items/order-item.entity';import { Product } from '../products/product.entity';import { User } from '../users/user.entity';import { InventoryService } from '../inventory/inventory.service';
@Injectable()export class OrdersService { constructor( @InjectDataSource() private dataSource: DataSource, private inventoryService: InventoryService, ) {}
async createOrder( userId: number, items: { productId: number; quantity: number }[], ): Promise<Order> { return this.withRetry(async () => { return this.dataSource.transaction( { isolation: 'REPEATABLE READ' }, async (manager) => { // Verify user const user = await manager.findOne(User, { where: { id: userId }, });
if (!user) { throw new NotFoundException('User not found'); }
// Lock and verify products const productIds = items.map(i => i.productId); const products = await manager .createQueryBuilder(Product, 'product') .setLock('pessimistic_write') .where('product.id IN (:...ids)', { ids: productIds }) .getMany();
if (products.length !== productIds.length) { throw new NotFoundException('Some products not found'); }
// Check stock for (const item of items) { const product = products.find(p => p.id === item.productId); if (product.stock < item.quantity) { throw new BadRequestException( `Insufficient stock for product ${product.name}`, ); } }
// Calculate total let total = 0; const orderItems: Partial<OrderItem>[] = [];
for (const item of items) { const product = products.find(p => p.id === item.productId); const itemTotal = product.price * item.quantity; total += itemTotal;
orderItems.push({ productId: product.id, quantity: item.quantity, price: product.price, total: itemTotal, }); }
// Create order const order = manager.create(Order, { userId, total, status: 'pending', }); await manager.save(order);
// Create order items await manager.insert( OrderItem, orderItems.map(item => ({ ...item, orderId: order.id })), );
// Update stock for (const item of items) { await manager.decrement( Product, { id: item.productId }, 'stock', item.quantity, ); }
// Log inventory change await manager.insert(InventoryLog, { orderId: order.id, action: 'order_created', timestamp: new Date(), });
return order; }, ); }); }
async cancelOrder(orderId: number): Promise<void> { await this.withRetry(async () => { await this.dataSource.transaction(async (manager) => { // Lock order const order = await manager.findOne(Order, { where: { id: orderId }, lock: { mode: 'pessimistic_write' }, relations: ['items'], });
if (!order) { throw new NotFoundException('Order not found'); }
if (order.status === 'cancelled') { throw new BadRequestException('Order already cancelled'); }
if (order.status === 'delivered') { throw new BadRequestException('Cannot cancel delivered order'); }
// Restore stock for (const item of order.items) { await manager.increment( Product, { id: item.productId }, 'stock', item.quantity, ); }
// Update order status order.status = 'cancelled'; await manager.save(order);
// Log cancellation await manager.insert(InventoryLog, { orderId: order.id, action: 'order_cancelled', timestamp: new Date(), }); }); }); }
private async withRetry<T>( operation: () => Promise<T>, maxRetries: number = 3, ): Promise<T> { let lastError: Error;
for (let attempt = 0; attempt < maxRetries; attempt++) { try { return await operation(); } catch (error) { lastError = error;
if (this.isRetryableError(error) && attempt < maxRetries - 1) { const delay = Math.pow(2, attempt) * 100; await new Promise(resolve => setTimeout(resolve, delay)); continue; }
throw error; } }
throw lastError; }
private isRetryableError(error: any): boolean { const retryableCodes = ['40P01', '40001', '55P03']; // Deadlock, serialization, lock return retryableCodes.includes(error.code); }}29.10 Summary
Section titled “29.10 Summary” Transaction Quick Reference +------------------------------------------------------------------+ | | | Method | Usage | | -------------------|------------------------------------------| | dataSource.transaction()| Simple transaction with callback | | entityManager.transaction()| Same as above | | queryRunner | Manual transaction control | | | | Lock Modes | Description | | -------------------|------------------------------------------| | pessimistic_read | Shared lock | | pessimistic_write | Exclusive lock | | optimistic (version)| Version-based concurrency | | | | Isolation Levels | Use Case | | -------------------|------------------------------------------| | READ COMMITTED | Default, good for most cases | | REPEATABLE READ | When same data read multiple times | | SERIALIZABLE | Highest isolation, strict consistency | | | | Best Practices | Description | | -------------------|------------------------------------------| | Keep short | Minimize transaction duration | | Lock order | Prevent deadlocks | | Retry logic | Handle transient failures | | Error handling | Always rollback on error | | | +------------------------------------------------------------------+Next Chapter
Section titled “Next Chapter”Chapter 30: Migrations & Schema Management
Last Updated: February 2026