End-to-End Encryption in Python: Building Secure Messaging Systems

Security Cryptography Python Networking
← Back to Blog

Introduction

In an era of mass surveillance and data breaches, end-to-end encryption (E2EE) is no longer optionalβ€”it's essential. This post walks through CipherChat, a CLI-based secure messaging system I built using hybrid AES-256 + RSA-2048 cryptography.

You'll learn the fundamentals of cryptographic protocols, key exchange mechanisms, and how to implement production-grade encryption in Python.

Why Hybrid Encryption?

Neither symmetric nor asymmetric encryption alone is ideal for messaging:

Symmetric Encryption (AES)

Asymmetric Encryption (RSA)

The Hybrid Solution

Combine both approaches:

  1. Use RSA to encrypt a random AES key
  2. Use AES to encrypt the actual message
  3. Send encrypted AES key + encrypted message
  4. Receiver decrypts AES key with their RSA private key
  5. Then decrypts message with recovered AES key
This is the same approach used by Signal, WhatsApp, and other E2EE messaging apps.

Architecture Overview

CipherChat implements a client-server architecture with the following security properties:

Implementation: RSA Key Generation

First, each client generates an RSA-2048 key pair:

from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization

def generate_rsa_keypair():
    """Generate RSA-2048 public/private key pair"""
    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048
    )
    public_key = private_key.public_key()
    
    # Serialize for storage/transmission
    private_pem = private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    )
    
    public_pem = public_key.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    )
    
    return private_pem, public_pem

Why RSA-2048?

RSA-2048 provides 112 bits of security, equivalent to AES-128. While RSA-4096 is more secure, it's significantly slower with minimal practical benefit for most applications.

Secure Key Exchange Protocol

The key exchange happens when two clients want to start a conversation:

Step 1: Public Key Distribution

class KeyExchange:
    def __init__(self, own_private_key, own_public_key):
        self.private_key = own_private_key
        self.public_key = own_public_key
        self.peer_public_keys = {}  # username -> public_key
    
    def register_peer(self, username, public_key_pem):
        """Verify and store peer's public key"""
        # Load public key
        public_key = serialization.load_pem_public_key(public_key_pem)
        
        # Calculate fingerprint for verification
        fingerprint = sha256(public_key_pem).hexdigest()[:16]
        print(f"Fingerprint for {username}: {fingerprint}")
        
        # In production, user should verify this out-of-band
        self.peer_public_keys[username] = public_key

Step 2: Encrypt Session Key

from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
import os

def encrypt_session_key(session_key, recipient_public_key):
    """Encrypt AES session key with recipient's RSA public key"""
    encrypted_key = recipient_public_key.encrypt(
        session_key,
        padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA256()),
            algorithm=hashes.SHA256(),
            label=None
        )
    )
    return encrypted_key

# Generate random AES-256 key for this session
session_key = os.urandom(32)  # 256 bits

Message Encryption with AES-256

Once we have a shared session key, we use AES-256 in GCM mode for authenticated encryption:

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os

class MessageEncryption:
    def __init__(self, session_key):
        self.cipher = AESGCM(session_key)
    
    def encrypt_message(self, plaintext):
        """Encrypt message with AES-256-GCM"""
        # Generate unique nonce (IV) for each message
        nonce = os.urandom(12)  # GCM standard nonce size
        
        # Encrypt and authenticate
        ciphertext = self.cipher.encrypt(
            nonce,
            plaintext.encode('utf-8'),
            None  # No additional authenticated data
        )
        
        # Return nonce + ciphertext
        return nonce + ciphertext
    
    def decrypt_message(self, encrypted_data):
        """Decrypt AES-256-GCM encrypted message"""
        # Split nonce and ciphertext
        nonce = encrypted_data[:12]
        ciphertext = encrypted_data[12:]
        
        # Decrypt and verify authenticity
        plaintext = self.cipher.decrypt(nonce, ciphertext, None)
        return plaintext.decode('utf-8')

Why AES-GCM?

GCM (Galois/Counter Mode) provides both confidentiality and authenticity:

TCP Socket Communication

CipherChat uses encrypted TCP sockets for real-time messaging:

Server Implementation

import socket
import threading

class SecureServer:
    def __init__(self, host='0.0.0.0', port=5555):
        self.host = host
        self.port = port
        self.clients = {}  # socket -> username
        self.public_keys = {}  # username -> public_key
    
    def start(self):
        """Start listening for connections"""
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server.bind((self.host, self.port))
        server.listen()
        
        print(f"[*] Server listening on {self.host}:{self.port}")
        
        while True:
            client_socket, address = server.accept()
            print(f"[+] Connection from {address}")
            
            # Handle each client in separate thread
            thread = threading.Thread(
                target=self.handle_client,
                args=(client_socket,)
            )
            thread.daemon = True
            thread.start()
    
    def handle_client(self, client_socket):
        """Handle individual client connection"""
        try:
            # Receive username and public key
            data = client_socket.recv(4096)
            username, public_key = self.parse_registration(data)
            
            self.clients[client_socket] = username
            self.public_keys[username] = public_key
            
            # Broadcast public key to all clients
            self.broadcast_public_key(username, public_key)
            
            # Listen for encrypted messages
            while True:
                encrypted_msg = client_socket.recv(4096)
                if not encrypted_msg:
                    break
                
                # Forward encrypted message (server can't decrypt)
                self.forward_message(username, encrypted_msg)
        
        except Exception as e:
            print(f"[!] Error: {e}")
        finally:
            self.disconnect_client(client_socket)

Client Implementation

class SecureClient:
    def __init__(self, server_host, server_port, username):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.username = username
        self.private_key, self.public_key = generate_rsa_keypair()
        self.session_keys = {}  # peer_username -> AES key
        
    def connect(self, server_host, server_port):
        """Connect to server and register"""
        self.socket.connect((server_host, server_port))
        
        # Send registration data
        reg_data = self.create_registration_packet()
        self.socket.send(reg_data)
        
        # Start listening thread
        thread = threading.Thread(target=self.receive_messages)
        thread.daemon = True
        thread.start()
    
    def send_message(self, recipient, message):
        """Encrypt and send message to recipient"""
        # Get or create session key
        if recipient not in self.session_keys:
            session_key = os.urandom(32)
            self.session_keys[recipient] = session_key
            
            # Encrypt session key with recipient's public key
            encrypted_key = encrypt_session_key(
                session_key,
                self.peer_public_keys[recipient]
            )
            
            # Send encrypted key first
            key_packet = self.create_key_packet(recipient, encrypted_key)
            self.socket.send(key_packet)
        
        # Encrypt message with session key
        encryptor = MessageEncryption(self.session_keys[recipient])
        encrypted_msg = encryptor.encrypt_message(message)
        
        # Create and send message packet
        msg_packet = self.create_message_packet(recipient, encrypted_msg)
        self.socket.send(msg_packet)

Message Integrity with HMAC

To prevent tampering, each message includes an HMAC (Hash-based Message Authentication Code):

import hmac
import hashlib

def create_hmac(session_key, message):
    """Generate SHA-256 HMAC for message"""
    return hmac.new(
        session_key,
        message,
        hashlib.sha256
    ).digest()

def verify_hmac(session_key, message, received_hmac):
    """Verify message HMAC"""
    expected_hmac = create_hmac(session_key, message)
    return hmac.compare_digest(expected_hmac, received_hmac)

# In message sending:
def send_authenticated_message(message, session_key):
    encrypted = encrypt_message(message, session_key)
    mac = create_hmac(session_key, encrypted)
    return encrypted + mac  # Encrypt-then-MAC pattern

Encrypt-then-MAC

Always MAC the ciphertext, not the plaintext. This prevents certain classes of attacks and provides slightly stronger security guarantees.

Multi-User Chat Room

Supporting group chats requires careful key management:

class GroupChat:
    def __init__(self, room_name):
        self.room_name = room_name
        self.members = set()
        self.ephemeral_key = os.urandom(32)  # Shared room key
    
    def add_member(self, username, public_key):
        """Add member and send them the room key"""
        self.members.add(username)
        
        # Encrypt room key with member's public key
        encrypted_room_key = encrypt_session_key(
            self.ephemeral_key,
            public_key
        )
        
        # Send encrypted key to new member
        send_encrypted_key(username, encrypted_room_key)
    
    def broadcast_message(self, sender, message):
        """Encrypt message with room key and broadcast"""
        encryptor = MessageEncryption(self.ephemeral_key)
        encrypted = encryptor.encrypt_message(message)
        
        # Send to all members except sender
        for member in self.members:
            if member != sender:
                send_to_user(member, encrypted)

Security Considerations

1. Forward Secrecy

Generate new session keys periodically to limit exposure if keys are compromised:

def rotate_session_key(self, recipient):
    """Rotate session key every N messages or T seconds"""
    new_key = os.urandom(32)
    self.send_new_session_key(recipient, new_key)
    self.session_keys[recipient] = new_key

2. Replay Attack Prevention

Include message sequence numbers and timestamps:

class MessagePacket:
    def __init__(self, sender, recipient, encrypted_data):
        self.sender = sender
        self.recipient = recipient
        self.data = encrypted_data
        self.timestamp = int(time.time())
        self.sequence_num = self.get_next_sequence()
    
    def verify(self, expected_sequence):
        """Verify sequence number and timestamp"""
        time_diff = abs(time.time() - self.timestamp)
        if time_diff > 300:  # 5 minute window
            raise SecurityError("Message too old")
        
        if self.sequence_num != expected_sequence:
            raise SecurityError("Invalid sequence number")

3. Denial of Service Protection

Performance Optimizations

Connection Pooling

class ConnectionPool:
    def __init__(self, max_connections=100):
        self.pool = []
        self.max_connections = max_connections
        self.lock = threading.Lock()
    
    def get_connection(self):
        with self.lock:
            if self.pool:
                return self.pool.pop()
            return self.create_connection()
    
    def release_connection(self, conn):
        with self.lock:
            if len(self.pool) < self.max_connections:
                self.pool.append(conn)

Async I/O with asyncio

For better scalability, use asynchronous networking:

import asyncio

async def handle_client_async(reader, writer):
    """Async client handler"""
    try:
        while True:
            data = await reader.read(4096)
            if not data:
                break
            
            # Process message asynchronously
            response = await process_message(data)
            writer.write(response)
            await writer.drain()
    finally:
        writer.close()
        await writer.wait_closed()

Testing & Validation

Security testing checklist:

Deployment & Production Hardening

For production use:

  1. Use TLS: Wrap sockets in TLS/SSL for transport security
  2. Key storage: Encrypt private keys at rest with password-derived keys
  3. Logging: Log security events without exposing sensitive data
  4. Updates: Plan for protocol version upgrades
  5. Auditing: Regular security audits and penetration testing

Open Source: CipherChat on GitHub

The complete implementation is available:

github.com/DNSdecoded/CipherChat

Includes:

Conclusion

Building secure systems requires understanding both cryptographic primitives and practical engineering considerations. CipherChat demonstrates that with careful design, you can create production-grade encrypted communication systems using Python's cryptography libraries.

Key takeaways:

Interested in cryptography or secure systems? Let's connect!

← Back to Blog