Skip to content

FILE SPLITTER - Splitting large files into smaller chunks

Python
#!/usr/bin/env python3
"""
FILE SPLITTER - Splitting large files into smaller chunks
Demonstrates various file splitting strategies
"""

import os
import tempfile

print("=" * 60)
print("FILE SPLITTER - Dividing Files into Chunks")
print("=" * 60)

temp_dir = tempfile.gettempdir()

# Create a sample file to split
source_file = os.path.join(temp_dir, "large_file.txt")
with open(source_file, 'w') as f:
    for i in range(1, 51):
        f.write(f"Line {i:02d}: This is sample data for splitting demonstration\n")

print(f"Created source file with 50 lines\n")

# Example 1: Split by line count
print("1. Split by Line Count (10 lines per file)")
print("-" * 40)
lines_per_file = 10
chunk_num = 1

with open(source_file, 'r') as src:
    lines_buffer = []

    for line in src:
        lines_buffer.append(line)

        if len(lines_buffer) >= lines_per_file:
            chunk_file = os.path.join(temp_dir, f"chunk_{chunk_num}.txt")
            with open(chunk_file, 'w') as chunk:
                chunk.writelines(lines_buffer)
            print(f"  Created: {os.path.basename(chunk_file)} ({len(lines_buffer)} lines)")
            lines_buffer = []
            chunk_num += 1

    # Write remaining lines
    if lines_buffer:
        chunk_file = os.path.join(temp_dir, f"chunk_{chunk_num}.txt")
        with open(chunk_file, 'w') as chunk:
            chunk.writelines(lines_buffer)
        print(f"  Created: {os.path.basename(chunk_file)} ({len(lines_buffer)} lines)")

# Example 2: Split by byte size
print("\n2. Split by Byte Size (200 bytes per file)")
print("-" * 40)
chunk_size = 200
chunk_num = 1

with open(source_file, 'rb') as src:
    while True:
        chunk_data = src.read(chunk_size)
        if not chunk_data:
            break

        chunk_file = os.path.join(temp_dir, f"byte_chunk_{chunk_num}.txt")
        with open(chunk_file, 'wb') as chunk:
            chunk.write(chunk_data)

        print(f"  Created: {os.path.basename(chunk_file)} ({len(chunk_data)} bytes)")
        chunk_num += 1

# Example 3: Split into N equal parts
print("\n3. Split into 5 Equal Parts")
print("-" * 40)
num_parts = 5

# Count total lines first
with open(source_file, 'r') as f:
    total_lines = sum(1 for _ in f)

lines_per_part = (total_lines + num_parts - 1) // num_parts  # Round up

with open(source_file, 'r') as src:
    for part_num in range(1, num_parts + 1):
        chunk_file = os.path.join(temp_dir, f"part_{part_num}_of_{num_parts}.txt")

        with open(chunk_file, 'w') as chunk:
            for _ in range(lines_per_part):
                line = src.readline()
                if not line:
                    break
                chunk.write(line)

        # Check if file has content
        if os.path.getsize(chunk_file) > 0:
            with open(chunk_file, 'r') as f:
                lines = len(f.readlines())
            print(f"  Created: {os.path.basename(chunk_file)} ({lines} lines)")
        else:
            os.remove(chunk_file)

# Example 4: Split by delimiter/pattern
print("\n4. Split by Pattern (Every 15 lines)")
print("-" * 40)
split_every = 15
chunk_num = 1
line_count = 0

current_chunk = []

with open(source_file, 'r') as src:
    for line in src:
        current_chunk.append(line)
        line_count += 1

        if line_count >= split_every:
            chunk_file = os.path.join(temp_dir, f"split_{chunk_num}.txt")
            with open(chunk_file, 'w') as chunk:
                chunk.writelines(current_chunk)
            print(f"  Created: {os.path.basename(chunk_file)} ({len(current_chunk)} lines)")

            current_chunk = []
            line_count = 0
            chunk_num += 1

    # Write remaining
    if current_chunk:
        chunk_file = os.path.join(temp_dir, f"split_{chunk_num}.txt")
        with open(chunk_file, 'w') as chunk:
            chunk.writelines(current_chunk)
        print(f"  Created: {os.path.basename(chunk_file)} ({len(current_chunk)} lines)")

# Example 5: Split with manifest file
print("\n5. Split with Manifest File")
print("-" * 40)
manifest_file = os.path.join(temp_dir, "manifest.txt")

lines_per_file = 12
chunk_num = 1
manifest_data = []

with open(source_file, 'r') as src:
    lines_buffer = []

    for line in src:
        lines_buffer.append(line)

        if len(lines_buffer) >= lines_per_file:
            chunk_file = os.path.join(temp_dir, f"manifest_chunk_{chunk_num}.txt")
            with open(chunk_file, 'w') as chunk:
                chunk.writelines(lines_buffer)

            manifest_data.append({
                'file': os.path.basename(chunk_file),
                'lines': len(lines_buffer),
                'size': os.path.getsize(chunk_file)
            })

            lines_buffer = []
            chunk_num += 1

    if lines_buffer:
        chunk_file = os.path.join(temp_dir, f"manifest_chunk_{chunk_num}.txt")
        with open(chunk_file, 'w') as chunk:
            chunk.writelines(lines_buffer)

        manifest_data.append({
            'file': os.path.basename(chunk_file),
            'lines': len(lines_buffer),
            'size': os.path.getsize(chunk_file)
        })

# Write manifest
with open(manifest_file, 'w') as mf:
    mf.write("SPLIT FILE MANIFEST\n")
    mf.write("=" * 60 + "\n")
    mf.write(f"Source: {os.path.basename(source_file)}\n")
    mf.write(f"Total chunks: {len(manifest_data)}\n\n")

    for i, info in enumerate(manifest_data, 1):
        mf.write(f"Chunk {i}:\n")
        mf.write(f"  File: {info['file']}\n")
        mf.write(f"  Lines: {info['lines']}\n")
        mf.write(f"  Size: {info['size']} bytes\n\n")

print(f"Created manifest: {manifest_file}")
with open(manifest_file, 'r') as f:
    print(f.read())

# Example 6: Verify split can be reconstructed
print("\n6. Verify Split Files Can Be Merged Back")
print("-" * 40)
reconstructed = os.path.join(temp_dir, "reconstructed.txt")

chunk_files = [os.path.join(temp_dir, f"chunk_{i}.txt") for i in range(1, 6)]

with open(reconstructed, 'w') as output:
    for chunk_file in chunk_files:
        if os.path.exists(chunk_file):
            with open(chunk_file, 'r') as chunk:
                output.write(chunk.read())

# Compare
with open(source_file, 'r') as orig, open(reconstructed, 'r') as recon:
    if orig.read() == recon.read():
        print("  ✓ Reconstruction successful!")
    else:
        print("  ✗ Reconstruction failed")

# Example 7: Split binary file
print("\n7. Split Binary File")
print("-" * 40)
binary_source = os.path.join(temp_dir, "binary.dat")

# Create binary file
with open(binary_source, 'wb') as f:
    f.write(bytes(range(100)))

chunk_size = 25
chunk_num = 1

with open(binary_source, 'rb') as src:
    while True:
        chunk_data = src.read(chunk_size)
        if not chunk_data:
            break

        chunk_file = os.path.join(temp_dir, f"binary_chunk_{chunk_num}.dat")
        with open(chunk_file, 'wb') as chunk:
            chunk.write(chunk_data)

        print(f"  Created: {os.path.basename(chunk_file)} ({len(chunk_data)} bytes)")
        chunk_num += 1

# Cleanup (remove only specific test files to avoid removing too many)
test_files = [source_file, reconstructed, binary_source, manifest_file]
for i in range(1, 10):
    test_files.extend([
        os.path.join(temp_dir, f"chunk_{i}.txt"),
        os.path.join(temp_dir, f"byte_chunk_{i}.txt"),
        os.path.join(temp_dir, f"part_{i}_of_{num_parts}.txt"),
        os.path.join(temp_dir, f"split_{i}.txt"),
        os.path.join(temp_dir, f"manifest_chunk_{i}.txt"),
        os.path.join(temp_dir, f"binary_chunk_{i}.dat"),
    ])

for f in test_files:
    if os.path.exists(f):
        os.remove(f)

print("\n" + "=" * 60)
print("Key Points:")
print("  - Split by line count or byte size")
print("  - Split into N equal parts")
print("  - Create manifest for tracking")
print("  - Verify by reconstruction")
print("  - Works for text and binary files")
print("=" * 60)