Skip to content

FILE LOCKING - Preventing concurrent file access

Python
#!/usr/bin/env python3
"""
FILE LOCKING - Preventing concurrent file access
Demonstrates file locking techniques for safe concurrent access
"""

import os
import tempfile
import time

print("=" * 60)
print("FILE LOCKING - Safe Concurrent Access")
print("=" * 60)

temp_dir = tempfile.gettempdir()

# Example 1: Simple lock file pattern
print("\n1. Lock File Pattern")
print("-" * 40)
data_file = os.path.join(temp_dir, "data.txt")
lock_file = data_file + ".lock"

def acquire_lock(lock_path, timeout=5):
    """Try to acquire lock file"""
    start_time = time.time()

    while True:
        try:
            # Try to create lock file exclusively
            fd = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
            os.close(fd)
            return True
        except FileExistsError:
            # Lock file exists
            if time.time() - start_time > timeout:
                return False
            time.sleep(0.1)

def release_lock(lock_path):
    """Release lock file"""
    if os.path.exists(lock_path):
        os.remove(lock_path)

# Acquire lock
if acquire_lock(lock_file):
    print("  Lock acquired")

    # Do file operations
    with open(data_file, 'w') as f:
        f.write("Protected data\n")
    print("  File updated")

    # Release lock
    release_lock(lock_file)
    print("  Lock released")
else:
    print("  Failed to acquire lock")

# Example 2: Lock file with PID
print("\n2. Lock File with Process ID")
print("-" * 40)

def acquire_lock_with_pid(lock_path):
    """Create lock file with current process ID"""
    try:
        with open(lock_path, 'x') as f:
            f.write(f"{os.getpid()}\n")
        return True
    except FileExistsError:
        # Check if lock is stale
        try:
            with open(lock_path, 'r') as f:
                pid = int(f.read().strip())
            print(f"  Lock held by PID: {pid}")
        except:
            pass
        return False

lock_file2 = os.path.join(temp_dir, "app.lock")

if acquire_lock_with_pid(lock_file2):
    print("  Lock acquired with PID")

    with open(lock_file2, 'r') as f:
        print(f"  Lock content: PID {f.read().strip()}")

    os.remove(lock_file2)
    print("  Lock released")

# Example 3: Timestamp-based lock
print("\n3. Timestamp-Based Lock (Stale Detection)")
print("-" * 40)

def acquire_lock_with_timeout(lock_path, max_age=10):
    """Acquire lock, remove if stale"""
    if os.path.exists(lock_path):
        age = time.time() - os.path.getmtime(lock_path)
        if age > max_age:
            print(f"  Removing stale lock (age: {age:.1f}s)")
            os.remove(lock_path)

    try:
        with open(lock_path, 'x') as f:
            f.write(f"Locked at {time.time()}\n")
        return True
    except FileExistsError:
        return False

lock_file3 = os.path.join(temp_dir, "time_lock.lock")

if acquire_lock_with_timeout(lock_file3):
    print("  Lock acquired with timestamp")
    os.remove(lock_file3)

# Example 4: Read/Write lock pattern
print("\n4. Read/Write Lock Pattern")
print("-" * 40)
shared_file = os.path.join(temp_dir, "shared.txt")
read_lock = shared_file + ".rlock"
write_lock = shared_file + ".wlock"

# Initialize file
with open(shared_file, 'w') as f:
    f.write("Initial data\n")

# Reader
def read_operation():
    # Create read lock
    with open(read_lock, 'w') as f:
        f.write("reading\n")

    try:
        with open(shared_file, 'r') as f:
            data = f.read()
            print(f"  Read: {data.strip()}")
    finally:
        if os.path.exists(read_lock):
            os.remove(read_lock)

# Writer
def write_operation(content):
    # Check for read locks
    if os.path.exists(read_lock):
        print("  Cannot write: read in progress")
        return False

    # Create write lock
    with open(write_lock, 'w') as f:
        f.write("writing\n")

    try:
        with open(shared_file, 'w') as f:
            f.write(content)
            print(f"  Wrote: {content.strip()}")
    finally:
        if os.path.exists(write_lock):
            os.remove(write_lock)

    return True

read_operation()
write_operation("Updated data\n")
read_operation()

# Example 5: Queue file pattern
print("\n5. Queue File Pattern (Atomic Operations)")
print("-" * 40)
queue_dir = os.path.join(temp_dir, "queue")
os.makedirs(queue_dir, exist_ok=True)

def enqueue(message):
    """Add message to queue"""
    # Write to temp file first
    temp_file = os.path.join(queue_dir, f".tmp_{time.time()}")
    final_file = os.path.join(queue_dir, f"msg_{time.time()}.txt")

    with open(temp_file, 'w') as f:
        f.write(message)

    # Atomic rename
    os.rename(temp_file, final_file)
    print(f"  Enqueued: {os.path.basename(final_file)}")

def dequeue():
    """Get next message from queue"""
    files = sorted([f for f in os.listdir(queue_dir) if f.startswith('msg_')])

    if not files:
        return None

    msg_file = os.path.join(queue_dir, files[0])

    with open(msg_file, 'r') as f:
        message = f.read()

    os.remove(msg_file)
    print(f"  Dequeued: {message.strip()}")
    return message

enqueue("Message 1\n")
enqueue("Message 2\n")
enqueue("Message 3\n")

print("\nProcessing queue:")
while True:
    msg = dequeue()
    if msg is None:
        break

# Example 6: Safe file update pattern
print("\n6. Safe File Update (Temp + Rename)")
print("-" * 40)
config_file = os.path.join(temp_dir, "config.txt")

# Initial config
with open(config_file, 'w') as f:
    f.write("setting1=value1\n")
    f.write("setting2=value2\n")

def safe_update_file(filepath, new_content):
    """Update file safely using temp file"""
    temp_file = filepath + ".tmp"

    # Write to temp file
    with open(temp_file, 'w') as f:
        f.write(new_content)

    # Atomic rename (replaces original)
    os.replace(temp_file, filepath)
    print("  File updated safely")

print("Original config:")
with open(config_file, 'r') as f:
    print(f"  {f.read()}")

new_config = "setting1=newvalue1\nsetting2=newvalue2\nsetting3=value3\n"
safe_update_file(config_file, new_config)

print("Updated config:")
with open(config_file, 'r') as f:
    print(f"  {f.read()}")

# Example 7: Lock context manager
print("\n7. Lock Context Manager")
print("-" * 40)

class FileLock:
    def __init__(self, lockfile):
        self.lockfile = lockfile
        self.acquired = False

    def __enter__(self):
        try:
            fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
            os.close(fd)
            self.acquired = True
            return self
        except FileExistsError:
            self.acquired = False
            raise RuntimeError(f"Could not acquire lock: {self.lockfile}")

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.acquired and os.path.exists(self.lockfile):
            os.remove(self.lockfile)

lock_file7 = os.path.join(temp_dir, "context.lock")
data_file7 = os.path.join(temp_dir, "data7.txt")

try:
    with FileLock(lock_file7):
        print("  Lock acquired via context manager")
        with open(data_file7, 'w') as f:
            f.write("Protected operation\n")
        print("  Operation completed")
    print("  Lock automatically released")
except RuntimeError as e:
    print(f"  {e}")

# Cleanup
cleanup_files = [data_file, shared_file, config_file, data_file7]
cleanup_dirs = [queue_dir]

for f in cleanup_files:
    if os.path.exists(f):
        os.remove(f)

for d in cleanup_dirs:
    if os.path.exists(d):
        for f in os.listdir(d):
            os.remove(os.path.join(d, f))
        os.rmdir(d)

print("\n" + "=" * 60)
print("Key Points:")
print("  - Lock files prevent concurrent access")
print("  - Use O_EXCL flag for atomic lock creation")
print("  - Include PID/timestamp for debugging")
print("  - Detect and remove stale locks")
print("  - Temp + rename for atomic updates")
print("=" * 60)