FILE LOCKING - Preventing concurrent file access
Python
#!/usr/bin/env python3
"""
FILE LOCKING - Preventing concurrent file access
Demonstrates file locking techniques for safe concurrent access
"""
import os
import tempfile
import time
print("=" * 60)
print("FILE LOCKING - Safe Concurrent Access")
print("=" * 60)
temp_dir = tempfile.gettempdir()
# Example 1: Simple lock file pattern
print("\n1. Lock File Pattern")
print("-" * 40)
data_file = os.path.join(temp_dir, "data.txt")
lock_file = data_file + ".lock"
def acquire_lock(lock_path, timeout=5):
"""Try to acquire lock file"""
start_time = time.time()
while True:
try:
# Try to create lock file exclusively
fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
return True
except FileExistsError:
# Lock file exists
if time.time() - start_time > timeout:
return False
time.sleep(0.1)
def release_lock(lock_path):
"""Release lock file"""
if os.path.exists(lock_path):
os.remove(lock_path)
# Acquire lock
if acquire_lock(lock_file):
print(" Lock acquired")
# Do file operations
with open(data_file, 'w') as f:
f.write("Protected data\n")
print(" File updated")
# Release lock
release_lock(lock_file)
print(" Lock released")
else:
print(" Failed to acquire lock")
# Example 2: Lock file with PID
print("\n2. Lock File with Process ID")
print("-" * 40)
def acquire_lock_with_pid(lock_path):
"""Create lock file with current process ID"""
try:
with open(lock_path, 'x') as f:
f.write(f"{os.getpid()}\n")
return True
except FileExistsError:
# Check if lock is stale
try:
with open(lock_path, 'r') as f:
pid = int(f.read().strip())
print(f" Lock held by PID: {pid}")
except:
pass
return False
lock_file2 = os.path.join(temp_dir, "app.lock")
if acquire_lock_with_pid(lock_file2):
print(" Lock acquired with PID")
with open(lock_file2, 'r') as f:
print(f" Lock content: PID {f.read().strip()}")
os.remove(lock_file2)
print(" Lock released")
# Example 3: Timestamp-based lock
print("\n3. Timestamp-Based Lock (Stale Detection)")
print("-" * 40)
def acquire_lock_with_timeout(lock_path, max_age=10):
"""Acquire lock, remove if stale"""
if os.path.exists(lock_path):
age = time.time() - os.path.getmtime(lock_path)
if age > max_age:
print(f" Removing stale lock (age: {age:.1f}s)")
os.remove(lock_path)
try:
with open(lock_path, 'x') as f:
f.write(f"Locked at {time.time()}\n")
return True
except FileExistsError:
return False
lock_file3 = os.path.join(temp_dir, "time_lock.lock")
if acquire_lock_with_timeout(lock_file3):
print(" Lock acquired with timestamp")
os.remove(lock_file3)
# Example 4: Read/Write lock pattern
print("\n4. Read/Write Lock Pattern")
print("-" * 40)
shared_file = os.path.join(temp_dir, "shared.txt")
read_lock = shared_file + ".rlock"
write_lock = shared_file + ".wlock"
# Initialize file
with open(shared_file, 'w') as f:
f.write("Initial data\n")
# Reader
def read_operation():
# Create read lock
with open(read_lock, 'w') as f:
f.write("reading\n")
try:
with open(shared_file, 'r') as f:
data = f.read()
print(f" Read: {data.strip()}")
finally:
if os.path.exists(read_lock):
os.remove(read_lock)
# Writer
def write_operation(content):
# Check for read locks
if os.path.exists(read_lock):
print(" Cannot write: read in progress")
return False
# Create write lock
with open(write_lock, 'w') as f:
f.write("writing\n")
try:
with open(shared_file, 'w') as f:
f.write(content)
print(f" Wrote: {content.strip()}")
finally:
if os.path.exists(write_lock):
os.remove(write_lock)
return True
read_operation()
write_operation("Updated data\n")
read_operation()
# Example 5: Queue file pattern
print("\n5. Queue File Pattern (Atomic Operations)")
print("-" * 40)
queue_dir = os.path.join(temp_dir, "queue")
os.makedirs(queue_dir, exist_ok=True)
def enqueue(message):
"""Add message to queue"""
# Write to temp file first
temp_file = os.path.join(queue_dir, f".tmp_{time.time()}")
final_file = os.path.join(queue_dir, f"msg_{time.time()}.txt")
with open(temp_file, 'w') as f:
f.write(message)
# Atomic rename
os.rename(temp_file, final_file)
print(f" Enqueued: {os.path.basename(final_file)}")
def dequeue():
"""Get next message from queue"""
files = sorted([f for f in os.listdir(queue_dir) if f.startswith('msg_')])
if not files:
return None
msg_file = os.path.join(queue_dir, files[0])
with open(msg_file, 'r') as f:
message = f.read()
os.remove(msg_file)
print(f" Dequeued: {message.strip()}")
return message
enqueue("Message 1\n")
enqueue("Message 2\n")
enqueue("Message 3\n")
print("\nProcessing queue:")
while True:
msg = dequeue()
if msg is None:
break
# Example 6: Safe file update pattern
print("\n6. Safe File Update (Temp + Rename)")
print("-" * 40)
config_file = os.path.join(temp_dir, "config.txt")
# Initial config
with open(config_file, 'w') as f:
f.write("setting1=value1\n")
f.write("setting2=value2\n")
def safe_update_file(filepath, new_content):
"""Update file safely using temp file"""
temp_file = filepath + ".tmp"
# Write to temp file
with open(temp_file, 'w') as f:
f.write(new_content)
# Atomic rename (replaces original)
os.replace(temp_file, filepath)
print(" File updated safely")
print("Original config:")
with open(config_file, 'r') as f:
print(f" {f.read()}")
new_config = "setting1=newvalue1\nsetting2=newvalue2\nsetting3=value3\n"
safe_update_file(config_file, new_config)
print("Updated config:")
with open(config_file, 'r') as f:
print(f" {f.read()}")
# Example 7: Lock context manager
print("\n7. Lock Context Manager")
print("-" * 40)
class FileLock:
def __init__(self, lockfile):
self.lockfile = lockfile
self.acquired = False
def __enter__(self):
try:
fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
self.acquired = True
return self
except FileExistsError:
self.acquired = False
raise RuntimeError(f"Could not acquire lock: {self.lockfile}")
def __exit__(self, exc_type, exc_val, exc_tb):
if self.acquired and os.path.exists(self.lockfile):
os.remove(self.lockfile)
lock_file7 = os.path.join(temp_dir, "context.lock")
data_file7 = os.path.join(temp_dir, "data7.txt")
try:
with FileLock(lock_file7):
print(" Lock acquired via context manager")
with open(data_file7, 'w') as f:
f.write("Protected operation\n")
print(" Operation completed")
print(" Lock automatically released")
except RuntimeError as e:
print(f" {e}")
# Cleanup
cleanup_files = [data_file, shared_file, config_file, data_file7]
cleanup_dirs = [queue_dir]
for f in cleanup_files:
if os.path.exists(f):
os.remove(f)
for d in cleanup_dirs:
if os.path.exists(d):
for f in os.listdir(d):
os.remove(os.path.join(d, f))
os.rmdir(d)
print("\n" + "=" * 60)
print("Key Points:")
print(" - Lock files prevent concurrent access")
print(" - Use O_EXCL flag for atomic lock creation")
print(" - Include PID/timestamp for debugging")
print(" - Detect and remove stale locks")
print(" - Temp + rename for atomic updates")
print("=" * 60)