Introduction
In an era of mass surveillance and data breaches, end-to-end encryption (E2EE) is no longer optionalβit's essential. This post walks through CipherChat, a CLI-based secure messaging system I built using hybrid AES-256 + RSA-2048 cryptography.
You'll learn the fundamentals of cryptographic protocols, key exchange mechanisms, and how to implement production-grade encryption in Python.
Why Hybrid Encryption?
Neither symmetric nor asymmetric encryption alone is ideal for messaging:
Symmetric Encryption (AES)
- β Fast: Encrypts gigabytes per second
- β Secure: AES-256 is military-grade
- β Key distribution problem: How do you securely share the key?
Asymmetric Encryption (RSA)
- β No shared secret: Public keys can be distributed openly
- β Digital signatures: Proves message authenticity
- β Slow: 100-1000x slower than AES
- β Size limits: Can only encrypt small payloads
The Hybrid Solution
Combine both approaches:
- Use RSA to encrypt a random AES key
- Use AES to encrypt the actual message
- Send encrypted AES key + encrypted message
- Receiver decrypts AES key with their RSA private key
- Then decrypts message with recovered AES key
This is the same approach used by Signal, WhatsApp, and other E2EE messaging apps.
Architecture Overview
CipherChat implements a client-server architecture with the following security properties:
- Zero-knowledge server: Server cannot read messages
- Forward secrecy: Compromised keys don't expose past messages
- Message authenticity: SHA-256 HMAC prevents tampering
- Identity authentication: Public key fingerprints verify users
Implementation: RSA Key Generation
First, each client generates an RSA-2048 key pair:
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
def generate_rsa_keypair():
"""Generate RSA-2048 public/private key pair"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
public_key = private_key.public_key()
# Serialize for storage/transmission
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return private_pem, public_pem
Why RSA-2048?
RSA-2048 provides 112 bits of security, equivalent to AES-128. While RSA-4096 is more secure, it's significantly slower with minimal practical benefit for most applications.
Secure Key Exchange Protocol
The key exchange happens when two clients want to start a conversation:
Step 1: Public Key Distribution
class KeyExchange:
def __init__(self, own_private_key, own_public_key):
self.private_key = own_private_key
self.public_key = own_public_key
self.peer_public_keys = {} # username -> public_key
def register_peer(self, username, public_key_pem):
"""Verify and store peer's public key"""
# Load public key
public_key = serialization.load_pem_public_key(public_key_pem)
# Calculate fingerprint for verification
fingerprint = sha256(public_key_pem).hexdigest()[:16]
print(f"Fingerprint for {username}: {fingerprint}")
# In production, user should verify this out-of-band
self.peer_public_keys[username] = public_key
Step 2: Encrypt Session Key
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
import os
def encrypt_session_key(session_key, recipient_public_key):
"""Encrypt AES session key with recipient's RSA public key"""
encrypted_key = recipient_public_key.encrypt(
session_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return encrypted_key
# Generate random AES-256 key for this session
session_key = os.urandom(32) # 256 bits
Message Encryption with AES-256
Once we have a shared session key, we use AES-256 in GCM mode for authenticated encryption:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os
class MessageEncryption:
def __init__(self, session_key):
self.cipher = AESGCM(session_key)
def encrypt_message(self, plaintext):
"""Encrypt message with AES-256-GCM"""
# Generate unique nonce (IV) for each message
nonce = os.urandom(12) # GCM standard nonce size
# Encrypt and authenticate
ciphertext = self.cipher.encrypt(
nonce,
plaintext.encode('utf-8'),
None # No additional authenticated data
)
# Return nonce + ciphertext
return nonce + ciphertext
def decrypt_message(self, encrypted_data):
"""Decrypt AES-256-GCM encrypted message"""
# Split nonce and ciphertext
nonce = encrypted_data[:12]
ciphertext = encrypted_data[12:]
# Decrypt and verify authenticity
plaintext = self.cipher.decrypt(nonce, ciphertext, None)
return plaintext.decode('utf-8')
Why AES-GCM?
GCM (Galois/Counter Mode) provides both confidentiality and authenticity:
- Encryption prevents eavesdropping
- Authentication tag prevents tampering
- Highly efficient (hardware-accelerated on modern CPUs)
TCP Socket Communication
CipherChat uses encrypted TCP sockets for real-time messaging:
Server Implementation
import socket
import threading
class SecureServer:
def __init__(self, host='0.0.0.0', port=5555):
self.host = host
self.port = port
self.clients = {} # socket -> username
self.public_keys = {} # username -> public_key
def start(self):
"""Start listening for connections"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.host, self.port))
server.listen()
print(f"[*] Server listening on {self.host}:{self.port}")
while True:
client_socket, address = server.accept()
print(f"[+] Connection from {address}")
# Handle each client in separate thread
thread = threading.Thread(
target=self.handle_client,
args=(client_socket,)
)
thread.daemon = True
thread.start()
def handle_client(self, client_socket):
"""Handle individual client connection"""
try:
# Receive username and public key
data = client_socket.recv(4096)
username, public_key = self.parse_registration(data)
self.clients[client_socket] = username
self.public_keys[username] = public_key
# Broadcast public key to all clients
self.broadcast_public_key(username, public_key)
# Listen for encrypted messages
while True:
encrypted_msg = client_socket.recv(4096)
if not encrypted_msg:
break
# Forward encrypted message (server can't decrypt)
self.forward_message(username, encrypted_msg)
except Exception as e:
print(f"[!] Error: {e}")
finally:
self.disconnect_client(client_socket)
Client Implementation
class SecureClient:
def __init__(self, server_host, server_port, username):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.username = username
self.private_key, self.public_key = generate_rsa_keypair()
self.session_keys = {} # peer_username -> AES key
def connect(self, server_host, server_port):
"""Connect to server and register"""
self.socket.connect((server_host, server_port))
# Send registration data
reg_data = self.create_registration_packet()
self.socket.send(reg_data)
# Start listening thread
thread = threading.Thread(target=self.receive_messages)
thread.daemon = True
thread.start()
def send_message(self, recipient, message):
"""Encrypt and send message to recipient"""
# Get or create session key
if recipient not in self.session_keys:
session_key = os.urandom(32)
self.session_keys[recipient] = session_key
# Encrypt session key with recipient's public key
encrypted_key = encrypt_session_key(
session_key,
self.peer_public_keys[recipient]
)
# Send encrypted key first
key_packet = self.create_key_packet(recipient, encrypted_key)
self.socket.send(key_packet)
# Encrypt message with session key
encryptor = MessageEncryption(self.session_keys[recipient])
encrypted_msg = encryptor.encrypt_message(message)
# Create and send message packet
msg_packet = self.create_message_packet(recipient, encrypted_msg)
self.socket.send(msg_packet)
Message Integrity with HMAC
To prevent tampering, each message includes an HMAC (Hash-based Message Authentication Code):
import hmac
import hashlib
def create_hmac(session_key, message):
"""Generate SHA-256 HMAC for message"""
return hmac.new(
session_key,
message,
hashlib.sha256
).digest()
def verify_hmac(session_key, message, received_hmac):
"""Verify message HMAC"""
expected_hmac = create_hmac(session_key, message)
return hmac.compare_digest(expected_hmac, received_hmac)
# In message sending:
def send_authenticated_message(message, session_key):
encrypted = encrypt_message(message, session_key)
mac = create_hmac(session_key, encrypted)
return encrypted + mac # Encrypt-then-MAC pattern
Encrypt-then-MAC
Always MAC the ciphertext, not the plaintext. This prevents certain classes of attacks and provides slightly stronger security guarantees.
Multi-User Chat Room
Supporting group chats requires careful key management:
class GroupChat:
def __init__(self, room_name):
self.room_name = room_name
self.members = set()
self.ephemeral_key = os.urandom(32) # Shared room key
def add_member(self, username, public_key):
"""Add member and send them the room key"""
self.members.add(username)
# Encrypt room key with member's public key
encrypted_room_key = encrypt_session_key(
self.ephemeral_key,
public_key
)
# Send encrypted key to new member
send_encrypted_key(username, encrypted_room_key)
def broadcast_message(self, sender, message):
"""Encrypt message with room key and broadcast"""
encryptor = MessageEncryption(self.ephemeral_key)
encrypted = encryptor.encrypt_message(message)
# Send to all members except sender
for member in self.members:
if member != sender:
send_to_user(member, encrypted)
Security Considerations
1. Forward Secrecy
Generate new session keys periodically to limit exposure if keys are compromised:
def rotate_session_key(self, recipient):
"""Rotate session key every N messages or T seconds"""
new_key = os.urandom(32)
self.send_new_session_key(recipient, new_key)
self.session_keys[recipient] = new_key
2. Replay Attack Prevention
Include message sequence numbers and timestamps:
class MessagePacket:
def __init__(self, sender, recipient, encrypted_data):
self.sender = sender
self.recipient = recipient
self.data = encrypted_data
self.timestamp = int(time.time())
self.sequence_num = self.get_next_sequence()
def verify(self, expected_sequence):
"""Verify sequence number and timestamp"""
time_diff = abs(time.time() - self.timestamp)
if time_diff > 300: # 5 minute window
raise SecurityError("Message too old")
if self.sequence_num != expected_sequence:
raise SecurityError("Invalid sequence number")
3. Denial of Service Protection
- Rate limiting: Max N connections per IP
- Message size limits: Reject oversized packets
- Resource quotas: Per-user memory/CPU limits
Performance Optimizations
Connection Pooling
class ConnectionPool:
def __init__(self, max_connections=100):
self.pool = []
self.max_connections = max_connections
self.lock = threading.Lock()
def get_connection(self):
with self.lock:
if self.pool:
return self.pool.pop()
return self.create_connection()
def release_connection(self, conn):
with self.lock:
if len(self.pool) < self.max_connections:
self.pool.append(conn)
Async I/O with asyncio
For better scalability, use asynchronous networking:
import asyncio
async def handle_client_async(reader, writer):
"""Async client handler"""
try:
while True:
data = await reader.read(4096)
if not data:
break
# Process message asynchronously
response = await process_message(data)
writer.write(response)
await writer.drain()
finally:
writer.close()
await writer.wait_closed()
Testing & Validation
Security testing checklist:
- β Man-in-the-middle attack resistance
- β Replay attack prevention
- β Forward secrecy verification
- β Message tampering detection
- β Large message handling (>1MB)
- β Connection interruption recovery
- β Concurrent user stress testing
Deployment & Production Hardening
For production use:
- Use TLS: Wrap sockets in TLS/SSL for transport security
- Key storage: Encrypt private keys at rest with password-derived keys
- Logging: Log security events without exposing sensitive data
- Updates: Plan for protocol version upgrades
- Auditing: Regular security audits and penetration testing
Open Source: CipherChat on GitHub
The complete implementation is available:
github.com/DNSdecoded/CipherChat
Includes:
- Full client and server implementation
- Multi-user chat rooms
- File transfer support
- CLI interface with rich formatting
- Comprehensive test suite
Conclusion
Building secure systems requires understanding both cryptographic primitives and practical engineering considerations. CipherChat demonstrates that with careful design, you can create production-grade encrypted communication systems using Python's cryptography libraries.
Key takeaways:
- Use hybrid encryption (RSA + AES) for performance and security
- Implement proper key exchange protocols
- Always use authenticated encryption (GCM or Encrypt-then-MAC)
- Design for forward secrecy and replay attack prevention
- Test thoroughly against known attack vectors
Interested in cryptography or secure systems? Let's connect!
β Back to Blog