Managing enrichers

Quick start guide to creating Enricher for your OSINT investigations.

Understanding Enrichers

Enrichers are the high-level business logic layer in Flowsint. While types define data structures and tools wrap external utilities, enrichers orchestrate the entire intelligence gathering workflow. An enricher takes input data of one type, processes it through various tools or APIs, validates and enriches the results, creates graph database nodes and relationships, and returns structured output.

Every enricher in Flowsint follows a two-phase execution model. The scan phase contains the core enriching logic where tools are executed, APIs are called, and data is gathered. Then, the postprocessing phase creates Neo4j graph nodes and relationships while returning the processed results. Input validation and normalization happens automatically through Pydantic type validation.

Enrichers differ from tools in several fundamental ways. Tools are low-level wrappers that return raw data without knowledge of the Flowsint ecosystem. Enrichers are high-level workflows that understand types, create graph nodes, handle parameters, and orchestrate multiple tools. When you want to add a new data source, you create a tool. When you want to add a new intelligence workflow, you create an enricher.

Enricher architecture

The enricher system is built around an abstract base class that defines the interface and execution flow. Every enricher you create inherits from this base class and implements specific methods.

The Enricher base class

The base class lives at flowsint-core/src/flowsint_core/core/enricher_base.py and provides the framework for all enrichers. Here's what a minimal enricher looks like:

from typing import List
from flowsint_core.core.enricher_base import Enricher
from flowsint_enrichers.registry import flowsint_enricher
from flowsint_types import Domain, Ip
 
@flowsint_enricher
class MyEnricher(Enricher):
    """Description of what this enricher does."""
 
    # Define input and output types as base types (not lists)
    InputType = Domain
    OutputType = Ip
 
    @classmethod
    def name(cls) -> str:
        """Unique identifier for this enricher."""
        return "domain_to_ip"
 
    @classmethod
    def category(cls) -> str:
        """Category this enricher belongs to."""
        return "Domain"
 
    @classmethod
    def key(cls) -> str:
        """Primary key field name for this enricher."""
        return "domain"
 
    async def scan(self, data: List[InputType]) -> List[OutputType]:
        """Core enriching logic."""
        pass
 
    def postprocess(self, results: List[OutputType], original_input: List[InputType]) -> List[OutputType]:
        """Create graph nodes and relationships."""
        pass
 
# Export types for easy access
InputType = MyEnricher.InputType
OutputType = MyEnricher.OutputType

The InputType and OutputType class attributes define what data the enricher accepts and returns. These should be Pydantic types from the flowsint-types package defined as base types (e.g., Domain, not List[Domain]). The base class uses these type definitions to automatically generate JSON schemas for the API and handle validation automatically.

At the end of the file, you should export the types for easy access by other modules.

The two phases

Understanding the two execution phases is crucial for writing effective enrichers.

Scanning is where the real work happens. This async method receives validated input data as a list of InputType instances and executes your intelligence gathering logic. You might instantiate tools, call external APIs, process results, and build up your output data. This phase should focus purely on gathering and processing data without worrying about the graph database. The base class automatically handles input validation through Pydantic before the scan phase begins.

Postprocessing creates the graph database structure. After scanning completes, this method receives both the results and the original input. It creates Neo4j nodes for each entity, establishes relationships between them, and returns the final results. This separation keeps graph logic separate from business logic.

Creating a simple enricher

Let's walk through creating a complete enricher from scratch. We'll build an enricher that converts domains to IP addresses using DNS resolution.

Setting up the file structure

Enrichers are organized by their input type. Create a new file in the appropriate directory under flowsint-enrichers/src/flowsint_enrichers/:

cd flowsint-enrichers/src/flowsint_enrichers/domain/
touch to_ip.py

If you're creating an enricher for a new input type, you may need to create a new directory first.

Implementing the basic structure

Start with the imports and class definition:

import socket
from typing import List
from flowsint_enrichers.registry import flowsint_enricher
from flowsint_core.core.enricher_base import Enricher
from flowsint_core.core.logger import Logger
from flowsint_types import Domain, Ip
 
@flowsint_enricher
class DomainToIpEnricher(Enricher):
    """Resolves domain names to their IP addresses using DNS."""
 
    # Define types as base types (not lists)
    InputType = Domain
    OutputType = Ip
 
    @classmethod
    def name(cls) -> str:
        return "domain_to_ip"
 
    @classmethod
    def category(cls) -> str:
        return "Domain"
 
    @classmethod
    def key(cls) -> str:
        return "domain"
 
    @classmethod
    def documentation(cls) -> str:
        return """
        This enricher resolves domain names to their IP addresses using
        standard DNS queries. It accepts a list of domains and returns
        the corresponding IP addresses.
        """
 
# Export types at the end of the file
InputType = DomainToIpEnricher.InputType
OutputType = DomainToIpEnricher.OutputType

The name() method returns a unique identifier for this enricher. Use lowercase with underscores, following the pattern inputtype_to_outputtype. The category() groups related enrichers together in the UI. The key() specifies which field serves as the primary identifier, typically matching the input type.

Implementing the scan logic

The scan method contains your core intelligence gathering logic. It receives a list of validated InputType instances and returns a list of OutputType instances.

    async def scan(self, data: List[InputType]) -> List[OutputType]:
        """
        Resolve each domain to its IP address.
 
        Args:
            data: List of Domain objects to resolve
 
        Returns:
            List of Ip objects
        """
        results: List[OutputType] = []
 
        for domain in data:
            try:
                # Perform DNS resolution
                ip_address = socket.gethostbyname(domain.domain)
                # Create IP object
                ip = Ip(address=ip_address)
                results.append(ip)
                # Log successful resolution
                Logger.info(
                    self.sketch_id,
                    {"message": f"Resolved {domain.domain} to {ip_address}"}
                )
            except socket.gaierror as e:
                # DNS resolution failed
                Logger.info(
                    self.sketch_id,
                    {"message": f"Failed to resolve {domain.domain}: {e}"}
                )
                continue
            except Exception as e:
                # Unexpected error
                Logger.error(
                    self.sketch_id,
                    {"message": f"Error resolving {domain.domain}: {e}"}
                )
                continue
        return results

This implementation iterates through each domain, performs DNS resolution, creates an IP object for successful resolutions, and logs both successes and failures. The error handling ensures that failures don't crash the entire enricher, which is important when processing many domains.

The input data has already been validated by Pydantic before reaching the scan method, so you can trust that all items are proper Domain objects.

Implementing postprocessing

The postprocess method creates graph database nodes and relationships using the new simplified API:

    def postprocess(self, results: List[OutputType], original_input: List[InputType]) -> List[OutputType]:
        """
        Create graph nodes and relationships.
 
        Args:
            results: IP objects from scan phase
            original_input: Original Domain objects
 
        Returns:
            IP objects (unchanged)
        """
        # Create nodes and relationships
        for domain, ip in zip(original_input, results):
            # Create nodes by passing Pydantic objects directly
            self.create_node(domain)
            self.create_node(ip)
 
            # Create relationship by passing Pydantic objects directly
            self.create_relationship(domain, ip, "RESOLVES_TO")
 
            # Log the operation
            self.log_graph_message(
                f"IP found for domain {domain.domain} -> {ip.address}"
            )
 
        return results

You can pass Pydantic objects directly to create_node() and create_relationship(). The methods automatically infer the node types, primary keys, and property values from the Pydantic models.

The create_node() method accepts a Pydantic object and automatically creates a Neo4j node with the correct label and properties. The create_relationship() method takes two Pydantic objects and a relationship type string, inferring all necessary information from the objects.

Creating an enricher with tools

Most enrichers use external tools for data gathering. Let's create an enricher that uses the Subfinder tool for subdomain enumeration.

Importing the tool

Start by importing the tool along with your other dependencies:

from typing import List
from flowsint_core.core.enricher_base import Enricher
from flowsint_enrichers.registry import flowsint_enricher
from flowsint_core.core.logger import Logger
from flowsint_types import Domain
from tools.network.subfinder import SubfinderTool
 
@flowsint_enricher
class SubdomainEnricher(Enricher):
    """Enumerates subdomains for given domains using Subfinder."""
 
    # Define types as base types
    InputType = Domain
    OutputType = Domain
 
    @classmethod
    def name(cls) -> str:
        return "domain_to_subdomains"
 
    @classmethod
    def category(cls) -> str:
        return "Domain"
 
    @classmethod
    def key(cls) -> str:
        return "domain"
 
# Export types
InputType = SubdomainEnricher.InputType
OutputType = SubdomainEnricher.OutputType

Using the tool in scan

The scan method instantiates and uses the tool:

    async def scan(self, data: List[InputType]) -> List[OutputType]:
        """
        Find subdomains using Subfinder tool.
        Args:
            data: List of Domain objects
        Returns:
            List of discovered subdomain Domain objects
        """
        results: List[OutputType] = []
        # Instantiate the tool
        subfinder = SubfinderTool()
 
        for domain in data:
            Logger.info(
                self.sketch_id,
                {"message": f"Enumerating subdomains for {domain.domain}"}
            )
            try:
                # Launch the tool
                subdomains = subfinder.launch(domain.domain)
                # Convert strings to Domain objects
                for subdomain in subdomains:
                    results.append(Domain(domain=subdomain, root=False))
                Logger.info(
                    self.sketch_id,
                    {"message": f"Found {len(subdomains)} subdomains for {domain.domain}"}
                )
            except Exception as e:
                Logger.error(
                    self.sketch_id,
                    {"message": f"Error enumerating subdomains for {domain.domain}: {e}"}
                )
                continue
        return results

Notice how the tool returns raw strings, and the enricher converts them into proper Domain objects. This separation of concerns keeps tools simple while enrichers handle type conversion.

Creating graph nodes and relationships

The postprocess phase creates parent-child relationships between domains and subdomains:

    def postprocess(self, results: List[OutputType], original_input: List[InputType]) -> List[OutputType]:
        """
        Create graph nodes and relationships for domains and subdomains.
 
        Args:
            results: Discovered subdomain Domain objects
            original_input: Original parent Domain objects
 
        Returns:
            Subdomain Domain objects
        """
        # Create nodes for parent domains
        for domain in original_input:
            self.create_node(domain)
 
        # Create nodes for subdomains and relationships
        for subdomain in results:
            self.create_node(subdomain)
 
            # Extract parent domain name and create relationship
            parent_domain_name = self._extract_parent_domain(subdomain.domain)
            parent_domain = Domain(domain=parent_domain_name)
 
            # Create relationship using Pydantic objects
            self.create_relationship(parent_domain, subdomain, "HAS_SUBDOMAIN")
 
            # Log the operation
            self.log_graph_message(
                f"Subdomain found: {parent_domain_name} -> {subdomain.domain}"
            )
 
        return results
 
    def _extract_parent_domain(self, subdomain: str) -> str:
        """Extract parent domain from subdomain."""
        parts = subdomain.split('.')
        if len(parts) >= 2:
            return '.'.join(parts[-2:])
        return subdomain

Adding parameters to enrichers

Many enrichers need user-configurable parameters. Let's create an enricher that scans ports with configurable options.

Defining the parameter schema

The get_params_schema() class method defines what parameters your enricher accepts:

from typing import List, Dict, Any, Optional
flowsint_enriche
from flowsint_enrichers.registry import flowsint_enricher
from flowsint_core.core.enricher_base import Enricher
from flowsint_types import Ip, Port
from tools.network.naabu import NaabuTool
 
@flowsint_enricher
class IpToPortsEnricher(Enricher):
    """Scans IP addresses for open ports."""
 
    # Define types as base types
    InputType = Ip
    OutputType = Port
 
    @classmethod
    def name(cls) -> str:
        return "ip_to_ports"
 
    @classmethod
    def category(cls) -> str:
        return "IP"
 
    @classmethod
    def key(cls) -> str:
        return "address"
 
    @classmethod
    def get_params_schema(cls) -> List[Dict[str, Any]]:
        """Define configurable parameters for this enricher."""
        return [
            {
                "name": "mode",
                "type": "select",
                "description": "Scan mode: active scanning or passive enumeration",
                "required": True,
                "default": "passive",
                "options": [
                    {"label": "Passive", "value": "passive"},
                    {"label": "Active", "value": "active"},
                ],
            },
            {
                "name": "port_range",
                "type": "string",
                "description": "Port range to scan (e.g., '1-1000' or '80,443,8080')",
                "required": False,
            },
            {
                "name": "top_ports",
                "type": "select",
                "description": "Scan only the most common ports",
                "required": False,
                "options": [
                    {"label": "Top 100", "value": "100"},
                    {"label": "Top 1000", "value": "1000"},
                ],
            },
            {
                "name": "PDCP_API_KEY",
                "type": "vaultSecret",
                "description": "ProjectDiscovery Cloud Platform API key for passive mode",
                "required": False,
            },
        ]
 
# Export types
InputType = IpToPortsEnricher.InputType
OutputType = IpToPortsEnricher.OutputType

The parameter schema defines the type, description, whether it's required, default values, and for select parameters, the available options. The vaultSecret type integrates with Flowsint's encrypted credential storage.

Using parameters in your enricher

Parameters are accessed through self.params in your scan method:

    async def scan(self, data: List[InputType]) -> List[OutputType]:
        """
        Scan IPs for open ports using configured parameters.
        Args:
            data: List of Ip objects to scan
        Returns:
            List of Port objects
        """
        results: List[OutputType] = []
        # Extract parameters
        mode = self.params.get("mode", "passive")
        port_range = self.params.get("port_range")
        top_ports = self.params.get("top_ports")
        api_key = self.get_secret("PDCP_API_KEY")
 
        # Instantiate tool
        naabu = NaabuTool()
 
        for ip in data:
            Logger.info(
                self.sketch_id,
                {"message": f"Scanning {ip.address} in {mode} mode"}
            )
            try:
                # Launch tool with parameters
                scan_results = naabu.launch(
                    target=ip.address,
                    mode=mode,
                    port_range=port_range,
                    top_ports=top_ports,
                    api_key=api_key
                )
                # Convert tool results to Port objects
                for result in scan_results:
                    port = Port(
                        number=result.get("port"),
                        protocol=result.get("protocol", "tcp").upper(),
                        state="open",
                        service=result.get("service"),
                        banner=result.get("version")
                    )
                    results.append(port)
            except Exception as e:
                Logger.error(
                    self.sketch_id,
                    {"message": f"Error scanning {ip.address}: {e}"}
                )
                continue
        return results

Handling multiple output types

Some enrichers produce multiple types of results. You can define a custom return type using Pydantic:

from pydantic import BaseModel
from flowsint_enrichers.registry import flowsint_enricher
from flowsint_core.core.enricher_base import Enricher
from typing import List
from flowsint_types import Website, Email, Phone
 
class CrawlerResults(BaseModel):
    """Results from web crawler including multiple entity types."""
    website: Website
    emails: List[Email] = []
    phones: List[Phone] = []
 
@flowsint_enricher
class WebsiteToCrawlerEnricher(Enricher):
    """Crawls websites to extract emails and phone numbers."""
 
    # Define types as base types
    InputType = Website
    OutputType = CrawlerResults
 
    async def scan(self, data: List[InputType]) -> List[OutputType]:
        """Crawl websites and extract contact information."""
        from tools.network.reconcrawl import ReconCrawlTool
 
        results: List[OutputType] = []
        crawler_tool = ReconCrawlTool()
 
        for website in data:
            try:
                # Launch crawler
                crawl_data = crawler_tool.launch(website.url)
 
                # Extract entities
                emails = [Email(email=e) for e in crawl_data.get("emails", [])]
                phones = [Phone(number=p) for p in crawl_data.get("phones", [])]
 
                # Create result object
                result = CrawlerResults(
                    website=website,
                    emails=emails,
                    phones=phones
                )
                results.append(result)
 
            except Exception as e:
                Logger.error(self.sketch_id, {"message": f"Crawl error: {e}"})
 
        return results
 
    def postprocess(self, results: List[OutputType], original_input: List[InputType]) -> List[OutputType]:
        """Create nodes for all discovered entities."""
        for result in results:
            # Create website node using Pydantic object
            self.create_node(result.website)
 
            # Create email nodes and relationships
            for email in result.emails:
                self.create_node(email)
                self.create_relationship(result.website, email, "HAS_EMAIL")
 
            # Create phone nodes and relationships
            for phone in result.phones:
                self.create_node(phone)
                self.create_relationship(result.website, phone, "HAS_PHONE")
 
            self.log_graph_message(
                f"Processed {len(result.emails)} emails and {len(result.phones)} phones from {result.website.url}"
            )
 
        return results
 
# Export types
InputType = WebsiteToCrawlerEnricher.InputType
OutputType = WebsiteToCrawlerEnricher.OutputType

Registering your enricher

You don't need to register your enricher anywhere, adding the decorator @flowsint_enricher to your enricher class triggers the auto discovery.

from flowsint_enrichers.registry import flowsint_enricher
from flowsint_core.core.enricher_base import Enricher
 
@flowsint_enricher
class MyEnricher(Enricher):
...

Testing your enricher

Creating tests helps ensure your enricher works correctly and makes debugging easier. Create a test file in flowsint-enrichers/tests/:

# tests/test_domain_to_ip.py
import pytest
from flowsint_enrichers.domain.to_ip import DomainToIpEnricher
from flowsint_types import Domain, Ip
 
@pytest.mark.asyncio
async def test_enricher_metadata():
    """Test enricher metadata is correctly defined."""
    assert DomainToIpEnricher.name() == "domain_to_ip"
    assert DomainToIpEnricher.category() == "Domain"
    assert DomainToIpEnricher.key() == "domain"
 
@pytest.mark.asyncio
async def test_type_definitions():
    """Test InputType and OutputType are correctly defined."""
    assert DomainToIpEnricher.InputType == Domain
    assert DomainToIpEnricher.OutputType == Ip
 
@pytest.mark.asyncio
async def test_scan():
    """Test DNS resolution works."""
    enricher = DomainToIpEnricher(sketch_id="test", scan_id="test")
    input_data = [Domain(domain="example.com")]
    results = await enricher.scan(input_data)
 
    assert len(results) > 0
    assert isinstance(results[0], Ip)
    assert results[0].address  # Should have an IP address

These tests verify that your enricher's metadata is correct, type definitions are properly set, and the scan logic produces expected results. Input validation is handled automatically by Pydantic, so you don't need to test preprocessing separately.

Best practices

When creating enrichers, think carefully about error handling. Intelligence gathering involves many external systems that can fail in unpredictable ways. Your enricher should handle errors gracefully, log failures clearly, and continue processing remaining items rather than crashing entirely.

Always use the Logger utility for tracking progress and errors. The logger integrates with Flowsint's monitoring system and helps users understand what's happening during long-running enrichers. Log successful operations at the info level and errors at the error level.

Define InputType and OutputType as base types (e.g., Domain, not List[Domain]). The base class automatically handles the list wrapping and validation. This makes the type definitions cleaner and more intuitive.

Always export your types at the end of the file using:

InputType = YourEnricher.InputType
OutputType = YourEnricher.OutputType

Use the simplified graph API by passing Pydantic objects directly to create_node() and create_relationship(). This eliminates boilerplate code and reduces errors. The methods automatically infer node types, primary keys, and properties from your Pydantic models.

Separate concerns between the two phases. Scanning should focus on gathering and processing data. Postprocessing should create graph structures. Input validation happens automatically through Pydantic, so you don't need to handle it manually.

Use type hints everywhere. They provide automatic validation, better IDE support, and serve as inline documentation. The InputType and OutputType class attributes should always be Pydantic types from the flowsint-types package.

When working with tools, remember that they return raw data structures. Your enricher is responsible for converting tool output into proper Flowsint types. This type conversion is typically done in the scan phase.

Document your enricher thoroughly. The class docstring, documentation method, and parameter descriptions all appear in the UI. Clear documentation helps users understand what your enricher does and how to configure it.

Handling API Rate Limits

When working with rate-limited APIs, add delays between requests:

    async def scan(self, data: InputType) -> OutputType:
        """Scan with rate limiting to respect API limits."""
        import asyncio
        results = []
        delay_seconds = 1  # Delay between requests
        for item in data:
            result = await self._query_api(item)
            if result:
                results.append(result)
            # Respect rate limits
            await asyncio.sleep(delay_seconds)
        return results

Fallback data sources

Implement fallback logic when primary sources fail:

    async def scan(self, data: InputType) -> OutputType:
        """Try multiple data sources with fallback logic."""
        results = []
 
        for domain in data:
            # Try primary source
            result = self._query_primary_source(domain)
 
            if not result:
                # Fall back to secondary source
                Logger.info(
                    self.sketch_id,
                    {"message": f"Primary source failed for {domain}, trying fallback"}
                )
                result = self._query_fallback_source(domain)
 
            if result:
                results.append(result)
 
        return results

Troubleshooting

If your enricher doesn't appear in the API after registration, verify that you've imported it in registry.py, called EnricherRegistry.register(), and restarted the API server. The registry is loaded at startup, so changes require a restart.

For import errors, make sure all your dependencies are installed and the enricher file has no syntax errors. Check that you're importing from the correct packages.

If the scan method isn't finding any results, add logging statements to debug what's happening. Verify that tools are installed and accessible, API keys are valid if required, and input data is in the expected format.

When graph relationships aren't appearing, check that you're creating both nodes and relationships in the postprocess method. Verify that the relationship type name is correct and that you're passing the right node objects to create_relationship().

Next steps

Once you've created and registered your enricher, it becomes available through the API for users to run. Enrichers can be chained together into Flows where the output of one enricher feeds into the input of another. This enables complex intelligence gathering sequences.

Remember that enrichers are the heart of Flowsint's intelligence gathering capabilities. Well-designed enrichers that handle errors gracefully, log progress clearly, and create meaningful graph relationships make the entire platform more powerful and user-friendly.

If you create new enrichers that you think can help the community, it's highly appreciated that you open-source them. Help the community as much as it helps you !

Need troubleshooting or spotted a bug ? Feel free to submit an issue here.