Skip to content

Bulk Loading

Bulk loading with NodeSet and RelationshipSet is Graphio's high-performance approach for loading large datasets into Neo4j. This guide covers advanced patterns, optimization techniques, and best practices.

Overview

Bulk loading is ideal for:

  • ETL processes - Loading data from external sources
  • Data migration - Moving data between Neo4j instances
  • Initial data loading - Populating a fresh database
  • Batch processing - Regular data updates from feeds

Key advantages:

  • High performance - Automatic batching (10,000 nodes/relationships per batch)
  • Memory efficient - Processes data in chunks
  • Simple API - Define containers, add data, load

NodeSet: Bulk Node Loading

Basic Usage

from graphio import NodeSet
from neo4j import GraphDatabase

driver = GraphDatabase.driver('neo4j://localhost:7687', auth=('neo4j', 'password'))

# Define container for Person nodes
people = NodeSet(['Person'], merge_keys=['email'])

# Add nodes
people.add({'name': 'Alice Smith', 'email': 'alice@example.com', 'age': 30})
people.add({'name': 'Bob Johnson', 'email': 'bob@example.com', 'age': 25})

# Bulk load to Neo4j
people.create(driver)

Multiple Labels

Assign multiple labels to all nodes in a NodeSet:

employees = NodeSet(['Person', 'Employee'], merge_keys=['employee_id'])
employees.add({'name': 'Alice', 'employee_id': 'E001', 'department': 'Engineering'})

Additional Labels

Use additional_labels to add extra labels to nodes after a MERGE, without including them in the MERGE pattern. This is useful when multiple datasets produce nodes with the same identity but you want to tag some of them with a more specific label.

# Merge on the shared label
items = NodeSet(['Item'], merge_keys=['id'])
items.add({'id': '1', 'name': 'First'})
items.merge(driver)
# Merge on the same label and key, but add an extra label
special_items = NodeSet(['Item'], merge_keys=['id'], additional_labels=['Featured'])
special_items.add({'id': '1', 'name': 'First', 'priority': 'high'})
special_items.merge(driver)
-- Generated Cypher
MERGE (n:Item { id: properties.id })
ON CREATE SET n = properties
ON MATCH SET n += properties
SET n:Featured

Because additional_labels are applied via SET after the MERGE, they accumulate. If the same node is merged from different NodeSets with different additional_labels, the node ends up with all of them.

Tip

additional_labels also works with create(). In that case, the additional labels are included directly in the CREATE clause.

Merge Keys and Uniqueness

Merge keys define node uniqueness for MERGE operations:

# Single merge key
users = NodeSet(['User'], merge_keys=['username'])

# Multiple merge keys (compound uniqueness)
locations = NodeSet(['Location'], merge_keys=['country', 'city'])
locations.add({'country': 'Germany', 'city': 'Munich', 'population': 1500000})

Performance Tip

Always create indexes for merge keys before bulk loading:

locations.create_index(driver)  # Creates indexes for country and city

Default Properties

Apply properties to all nodes automatically:

# All employees get a default department
employees = NodeSet(
    ['Person', 'Employee'], 
    merge_keys=['employee_id'],
    default_props={'company': 'ACME Corp', 'active': True}
)

employees.add({'name': 'Alice', 'employee_id': 'E001'})
# Result: Alice gets company='ACME Corp' and active=True automatically

Handling Duplicates

Option 1: Allow duplicates (fastest)

people.add({'name': 'Alice', 'email': 'alice@example.com'})
people.add({'name': 'Alice', 'email': 'alice@example.com'})  # Duplicate allowed
len(people.nodes)  # Returns 2

Option 2: Built-in deduplication (fast)

# Enable deduplication at NodeSet creation
people = NodeSet(['Person'], merge_keys=['email'], deduplicate=True)

people.add({'name': 'Alice', 'email': 'alice@example.com'})
people.add({'name': 'Alice', 'email': 'alice@example.com'})  # Skipped automatically
len(people.nodes)  # Returns 1

# Override deduplication with force parameter
people.add({'name': 'Alice', 'email': 'alice@example.com'}, force=True)  # Added despite duplicate
len(people.nodes)  # Returns 2

Performance Note

The built-in deduplicate=True option uses an efficient internal index and is suitable for large datasets.

Advanced Deduplication Patterns

Mixed deduplication within the same NodeSet:

# Process multiple data sources with different deduplication needs
customers = NodeSet(['Customer'], merge_keys=['email'], deduplicate=True)

# Source A: Clean data, allow deduplication
for record in clean_customer_data:
    customers.add(record)  # Automatically deduplicated

# Source B: Known to have intentional duplicates (e.g., for counting)
for record in duplicate_tracking_data:
    customers.add(record, force=True)  # Force add despite duplicates

Multiple merge keys deduplication:

# Deduplicate on compound keys
locations = NodeSet(['Location'], merge_keys=['country', 'city'], deduplicate=True)

locations.add({'country': 'Germany', 'city': 'Munich', 'population': 1500000})
locations.add({'country': 'Germany', 'city': 'Berlin', 'population': 3700000})
locations.add({'country': 'Germany', 'city': 'Munich', 'population': 1600000})  # Skipped - same country+city

len(locations.nodes)  # Returns 2 (Munich and Berlin)

Batch operations with deduplication:

# add_nodes() also respects deduplication settings
people = NodeSet(['Person'], merge_keys=['email'], deduplicate=True)

batch_data = [
    {'name': 'Alice', 'email': 'alice@example.com'},
    {'name': 'Bob', 'email': 'bob@example.com'},
    {'name': 'Alice', 'email': 'alice@example.com'},  # Will be skipped
    {'name': 'Charlie', 'email': 'charlie@example.com'}
]

people.add_nodes(batch_data)  # Automatically deduplicates during batch add
len(people.nodes)  # Returns 3 (Alice, Bob, Charlie)


RelationshipSet: Bulk Relationship Loading

Basic Usage

from graphio import RelationshipSet

# Define relationship container
employments = RelationshipSet(
    'WORKS_AT',           # Relationship type
    ['Person'],           # Start node labels
    ['Company'],          # End node labels
    ['email'],            # Start node matching properties
    ['name']              # End node matching properties
)

# Add relationships
employments.add(
    {'email': 'alice@example.com'},    # Start node matcher
    {'name': 'ACME Corp'},             # End node matcher  
    {'position': 'Developer', 'since': '2023-01-15'}  # Relationship properties
)

# Bulk load
employments.create(driver)

Multiple Match Properties

Match nodes using multiple properties:

# Match locations by country AND city
visits = RelationshipSet(
    'VISITED',
    ['Person'], ['Location'],
    ['email'], ['country', 'city']
)

visits.add(
    {'email': 'alice@example.com'},
    {'country': 'Germany', 'city': 'Munich'},
    {'date': '2023-06-15', 'duration_days': 5}
)

Default Relationship Properties

survey_responses = RelationshipSet(
    'RATED', ['Person'], ['Product'],
    ['user_id'], ['product_id'],
    default_props={'survey': 'Q3-2023', 'method': 'online'}
)

# Every relationship gets survey and method automatically
survey_responses.add(
    {'user_id': 'U001'},
    {'product_id': 'P456'},
    {'rating': 5}
)

Append Properties on Relationships

Just like NodeSet, RelationshipSet supports append_props to accumulate values in a list across multiple merges. This is useful when the same relationship is merged from different data sources and you want to track all values instead of overwriting.

# Track sources that contributed to each relationship
citations = RelationshipSet(
    'CITES', ['Paper'], ['Paper'],
    ['doi'], ['doi'],
    append_props=['source']
)

citations.add({'doi': '10.1/a'}, {'doi': '10.1/b'}, {'source': 'pubmed', 'score': 0.9})
citations.merge(driver)
# Result: source=['pubmed'], score=0.9

citations2 = RelationshipSet(
    'CITES', ['Paper'], ['Paper'],
    ['doi'], ['doi'],
    append_props=['source']
)
citations2.add({'doi': '10.1/a'}, {'doi': '10.1/b'}, {'source': 'scopus', 'score': 0.95})
citations2.merge(driver)
# Result: source=['pubmed', 'scopus'], score=0.95 (score overwritten, source appended)

APOC Required

append_props on RelationshipSet uses apoc.map.removeKeys(), same as NodeSet.


Performance Optimization

Index Strategy

Always create indexes before bulk loading:

# For NodeSets
people.create_index(driver)        # Creates index on email
companies.create_index(driver)     # Creates index on name

# For RelationshipSets  
employments.create_index(driver)   # Creates indexes on both Person(email) and Company(name)

Compound indexes for multiple merge keys:

locations = NodeSet(['Location'], merge_keys=['country', 'city'])
locations.create_index(driver)  # Creates compound index on (country, city)

Loading Order Matters

Load nodes before relationships:

# ✅ Correct order
companies.create(driver)      # Load companies first
people.create(driver)         # Then people
employments.create(driver)    # Finally relationships

# ❌ Wrong order - relationships may fail if nodes don't exist
employments.create(driver)    # Relationships first - BAD!
people.create(driver)
companies.create(driver)

Create vs Merge Operations

Create Operation

  • Speed: Fastest - no uniqueness checks
  • Use case: Fresh databases, known unique data
  • Behavior: Always creates new nodes/relationships
# Creates all nodes, even duplicates
people.create(driver)

Merge Operation

  • Speed: Slower - checks for existing data
  • Use case: Updating existing data, uncertain about duplicates
  • Behavior: Updates existing, creates new
# Updates existing nodes based on merge_keys, creates new ones
people.merge(driver)

Advanced Merge Options

APOC Required

The preserve and append_props features require the APOC library to be installed in your Neo4j database. These features use apoc.map.removeKeys() and other APOC utilities.

Preserve specific properties during merge:

# Don't overwrite 'created_date' and 'original_source' on existing nodes
people.merge(driver, preserve=['created_date', 'original_source'])

# Equivalent Cypher behavior:
# ON CREATE SET n += properties
# ON MATCH SET n += apoc.map.removeKeys(properties, ['created_date', 'original_source'])

Append to array properties (works on both NodeSet and RelationshipSet):

# Append to 'tags' array instead of replacing
articles = NodeSet(['Article'], merge_keys=['id'], append_props=['tags'])
articles.merge(driver)

# If node exists with tags=['tech'], and new node has tags=['python']
# Result: tags=['tech', 'python']

Set merge behavior at creation:

# Define merge behavior when creating NodeSet
users = NodeSet(
    ['User'], ['username'], 
    preserve=['registration_date'],
    append_props=['login_history']
)


Error Handling and Validation

Connection Errors

from neo4j.exceptions import ServiceUnavailable

try:
    people.create(driver)
except ServiceUnavailable:
    print("Neo4j database is not available")

Data Validation

def validate_email(email):
    return '@' in email

# Validate data before adding
for person_data in source_data:
    if validate_email(person_data.get('email', '')):
        people.add(person_data)
    else:
        print(f"Invalid email: {person_data}")

Missing Reference Nodes

# RelationshipSets don't validate that referenced nodes exist
# This relationship will be silently ignored if nodes don't exist
employments.add(
    {'email': 'nonexistent@example.com'},  # Node doesn't exist
    {'name': 'ACME Corp'},
    {'position': 'Developer'}
)

Important

Graphio does not validate that nodes referenced in relationships actually exist. Ensure proper loading order: nodes before relationships.


Real-World Example: ETL Pipeline

from graphio import NodeSet, RelationshipSet
from neo4j import GraphDatabase
import pandas as pd

def load_employee_data(csv_file, driver):
    # Read source data
    df = pd.read_csv(csv_file)

    # Create containers with built-in deduplication
    employees = NodeSet(['Person', 'Employee'], merge_keys=['employee_id'])
    departments = NodeSet(['Department'], merge_keys=['name'], deduplicate=True)  # Deduplicate departments
    works_in = RelationshipSet('WORKS_IN', ['Employee'], ['Department'], 
                              ['employee_id'], ['name'])

    # Process data
    for _, row in df.iterrows():
        # Add employee
        employees.add({
            'employee_id': row['id'],
            'name': row['full_name'],
            'email': row['email'],
            'hire_date': row['start_date']
        })

        # Add department - automatically deduplicated
        departments.add({
            'name': row['department'],
            'budget': row['dept_budget']
        })

        # Add relationship
        works_in.add(
            {'employee_id': row['id']},
            {'name': row['department']},
            {'start_date': row['start_date']}
        )

    # Create indexes for performance
    employees.create_index(driver)
    departments.create_index(driver)

    # Load data in correct order
    departments.merge(driver)  # Departments first
    employees.merge(driver)    # Then employees  
    works_in.merge(driver)     # Finally relationships

    print(f"Loaded {len(employees.nodes)} employees into {len(departments.nodes)} departments")

# Usage
driver = GraphDatabase.driver('neo4j://localhost:7687', auth=('neo4j', 'password'))
load_employee_data('employees.csv', driver)

This example demonstrates:

  • Reading from external data source
  • Processing data with validation
  • Using appropriate merge strategies
  • Creating indexes for performance
  • Loading in correct order

Best Practices Summary

  1. 🚀 Performance
  2. Create indexes before bulk loading
  3. Load nodes before relationships
  4. Use create() for fresh data, merge() for updates

  5. 💾 Memory Management

  6. Process very large datasets in chunks
  7. Use deduplicate=True for automatic duplicate prevention

  8. 🔍 Data Quality

  9. Validate data before adding to containers
  10. Handle missing reference nodes appropriately
  11. Use proper merge keys for your domain

  12. ⚡ Optimization

  13. Batch related operations together
  14. Use default properties to reduce repetitive data
  15. Monitor loading progress for large datasets

Integration with OGM

Bulk loading works seamlessly with OGM models. Use OGM to define your data structure and validation, then leverage bulk loading for high-performance operations.

Why Combine OGM + Bulk Loading?

  • OGM provides: Structure, validation, type safety, intuitive queries
  • Bulk loading provides: High performance, memory efficiency, automatic batching
  • Together: Best developer experience with maximum performance

Quick Integration Example

from graphio import NodeModel, NodeSet, Base

# Define structure with OGM
class Employee(NodeModel):
    _labels = ['Person', 'Employee']
    _merge_keys = ['employee_id']

    name: str
    employee_id: str
    email: str
    department: str

# Set up indexes using OGM
Base.set_driver(driver)
Base.create_indexes()

# Get bulk container directly from OGM model
employees = Employee.dataset()  # Automatically uses Employee's configuration

# Validate with OGM, load with bulk
import pandas as pd
df = pd.read_csv('employees.csv')

for _, row in df.iterrows():
    # Validate data structure with OGM
    emp_data = {
        'name': row['name'],
        'employee_id': row['id'], 
        'email': row['email'],
        'department': row['dept']
    }
    employee = Employee(**emp_data)  # Validates or raises error

    # Add to bulk container
    employees.add(employee.model_dump())

# Bulk load validated data
employees.create(driver)

# Query with OGM convenience
tech_employees = Employee.match(Employee.department == 'Technology').all()

Integration Patterns

Pattern 1: Validation-First Loading

def validated_bulk_load(model_class, data_source, driver):
    """Generic function to bulk load any OGM model with validation"""

    # Create bulk container using dataset() method
    bulk_container = model_class.dataset()

    valid_count = 0
    error_count = 0

    for record in data_source:
        try:
            # Validate with OGM model
            instance = model_class(**record)
            bulk_container.add(instance.model_dump())
            valid_count += 1
        except ValidationError as e:
            print(f"Skipping invalid record: {e}")
            error_count += 1

    # Bulk load validated data
    bulk_container.create(driver)

    return valid_count, error_count

# Usage with any model
class Product(NodeModel):
    _labels = ['Product']
    _merge_keys = ['sku']
    sku: str
    name: str
    price: float

loaded, errors = validated_bulk_load(Product, product_data, driver)

Pattern 2: Mixed Operations

# Use different approaches for different operations
class InventoryManager:
    def __init__(self, driver):
        self.driver = driver
        Base.set_driver(driver)

    def daily_import(self, csv_file):
        """Bulk load daily inventory updates"""
        products = Product.dataset()  # Use Product model's dataset

        df = pd.read_csv(csv_file)
        for _, row in df.iterrows():
            products.add({
                'sku': row['sku'],
                'stock_count': row['stock'],
                'last_updated': datetime.now()
            })

        products.merge(self.driver)  # Update existing products
        return len(products.nodes)

    def price_adjustment(self, sku: str, new_price: float):
        """Individual price update using OGM"""
        product = Product.match(Product.sku == sku).first()
        if product:
            product.price = new_price
            product.merge()
            return True
        return False

Performance Benefits

When you combine OGM + bulk loading effectively:

# ❌ Slow: Individual OGM operations for bulk data
for record in large_dataset:  # 10,000 records
    person = Person(**record)
    person.merge()  # 10,000 individual database calls!

# ✅ Fast: Validate with OGM, load with bulk
people = Person.dataset()  # Use dataset() method
for record in large_dataset:  # 10,000 records
    person = Person(**record)  # Validate locally
    people.add(person.model_dump())  # Add to batch

people.create(driver)  # Single efficient bulk operation

Real-World Integration: Data Pipeline

class ETLPipeline:
    def __init__(self, driver):
        self.driver = driver
        Base.set_driver(driver)
        Base.create_indexes()  # Create all OGM model indexes

    def process_customers(self, customer_file):
        """Load customer data with validation"""
        customers = Customer.dataset()  # Use Customer model's dataset
        addresses = Address.dataset()   # Use Address model's dataset
        lives_at = Customer.lives_at.dataset()  # Use relationship's dataset

        df = pd.read_csv(customer_file)
        for _, row in df.iterrows():
            # Validate customer data
            customer_data = {
                'email': row['email'],
                'name': row['name'],
                'phone': row['phone']
            }
            Customer(**customer_data)  # Validates

            # Validate address data  
            address_data = {
                'address_id': row['address_id'],
                'street': row['street'],
                'city': row['city'],
                'country': row['country']
            }
            Address(**address_data)  # Validates

            # Add to bulk containers
            customers.add(customer_data)
            addresses.add(address_data)
            lives_at.add(
                {'email': row['email']},
                {'address_id': row['address_id']},
                {'since': row['move_in_date']}
            )

        # Load in correct order
        addresses.create(self.driver)
        customers.create(self.driver) 
        lives_at.create(self.driver)

        return len(customers.nodes)

    def customer_analytics(self):
        """Use OGM for complex analytics after bulk loading"""
        from graphio.ogm.model import CypherQuery

        # Find customers in major cities
        # Complex relationship queries require custom Cypher
        major_city_query = CypherQuery("""
            MATCH (c:Customer)-[:LIVES_AT]->(city:City)
            WHERE city.city IN ['London', 'Paris', 'Berlin']
            RETURN DISTINCT c
        """)
        major_city_customers = Customer.match(major_city_query).all()

        return [c.name for c in major_city_customers]

# Usage
pipeline = ETLPipeline(driver)
loaded = pipeline.process_customers('customers.csv')  # Bulk + validation
analytics = pipeline.customer_analytics()  # OGM queries

Integration Best Practices

  1. Use OGM for structure definition

    # Define once with OGM
    class Product(NodeModel):
        _labels = ['Product'] 
        _merge_keys = ['sku']
    
    # Get bulk container directly from model
    products = Product.dataset()  # Automatically matches model configuration
    

  2. Leverage OGM validation

    # Validate before adding to bulk container
    for data in source:
        model_instance = ProductModel(**data)  # Validates
        bulk_container.add(model_instance.model_dump())
    

  3. Use registry for index management

    # Create indexes for all models at once
    Base.model_create_index()
    
    # Then use bulk loading with existing indexes
    products.create(driver)
    

  4. Choose the right approach for each task

    # Bulk loading: High-volume, repetitive operations
    products.create(driver)  # Load 10,000 products
    
    # OGM: Application logic, complex queries
    product = Product.match(Product.sku == 'ABC123').first()
    recommendations = product.similar_products.match().all()
    

This hybrid approach gives you the best of both worlds: the structure and developer experience of OGM with the performance of bulk loading.

For complete OGM documentation, see the OGM Guide.