Advanced Usage Examples

This section provides advanced usage examples for the edgar-sec library, including caching, rate limiting, concurrent requests, and sophisticated data extraction techniques.

Caching and Rate Limiting

The edgar-sec library includes built-in support for caching and rate limiting to optimize API usage and comply with SEC guidelines.

import edgar_sec as ed

# Initialize the API client with caching enabled
edgar = ed.EdgarAPI(cache_mode=True)

# First request will hit the API
apple_facts = edgar.get_company_facts("0000320193")
print(f"Number of taxonomies: {len(apple_facts.facts)}")

# Subsequent identical requests will use cache (much faster)
apple_facts_cached = edgar.get_company_facts("0000320193")

# The built-in rate limiter ensures you don't exceed 10 API calls per second
# This is handled automatically for both synchronous and asynchronous requests
for cik in ["0000320193", "0000789019", "0001652044", "0001018724", "0001326801"]:
    company = edgar.get_submissions(central_index_key=cik)
    print(f"Retrieved data for {company.name}")

Concurrent Requests with AsyncAPI

You can use the Async attribute to make concurrent requests for improved performance.

import asyncio
import edgar_sec as ed

async def fetch_multiple_companies():
    edgar = ed.EdgarAPI(cache_mode=True)

    # Define company CIKs
    companies = {
        "0000320193": "Apple",      # Apple
        "0000789019": "Microsoft",  # Microsoft
        "0001652044": "Alphabet",   # Alphabet (Google)
        "0001018724": "Amazon",     # Amazon
        "0001326801": "Meta"        # Meta (Facebook)
    }

    # Fetch submission history for multiple companies concurrently
    submission_tasks = [
        edgar.Async.get_submissions(cik)
        for cik in companies.keys()
    ]
    submissions = await asyncio.gather(*submission_tasks)

    # Process the results
    for submission, name in zip(submissions, companies.values()):
        print(f"{name} ({submission.name}):")
        print(f"  Ticker: {submission.tickers}")
        print(f"  Recent filing: {submission.filings[0].form} on {submission.filings[0].filing_date}")
        print()

    # Now fetch the same concept across all companies
    concept_tasks = [
        edgar.Async.get_company_concept(cik, "us-gaap", "Assets")
        for cik in companies.keys()
    ]
    concepts = await asyncio.gather(*concept_tasks)

    # Compare assets across companies
    print("Total Assets Comparison:")
    for concept, name in zip(concepts, companies.values()):
        # Get the most recent USD disclosure if available
        if concept.units and hasattr(concept.units[0], 'val'):
            latest = concept.units[0]
            print(f"  {name}: ${latest.val:,} ({latest.fy} {latest.fp})")

# Run the async function
asyncio.run(fetch_multiple_companies())

Advanced Data Extraction

Extract specific financial metrics from company facts for analysis.

import edgar_sec as ed
import pandas as pd
import matplotlib.pyplot as plt

# Initialize the API client
edgar = ed.EdgarAPI(cache_mode=True)

# Fetch all Apple facts
apple_facts = edgar.get_company_facts("0000320193")

# Extract revenue time series
if "us-gaap" in apple_facts.facts:
    # Look for various revenue concept tags (may vary by company)
    revenue_tags = [
        "Revenue",
        "RevenueFromContractWithCustomerExcludingAssessedTax",
        "SalesRevenueNet",
        "RevenueFromContractWithCustomer"
    ]

    # Find the first matching tag
    revenue_tag = next((tag for tag in revenue_tags if tag in apple_facts.facts["us-gaap"].disclosures), None)

    if revenue_tag and "USD" in apple_facts.facts["us-gaap"].disclosures[revenue_tag].units:
        # Extract revenue data
        revenue_data = []
        for fact in apple_facts.facts["us-gaap"].disclosures[revenue_tag].units["USD"]:
            # Only include annual (FY) or quarterly (Q1-Q4) data
            if fact.fp in ["FY", "Q1", "Q2", "Q3", "Q4"]:
                revenue_data.append({
                    "period": f"{fact.fy} {fact.fp}",
                    "date": fact.end,
                    "revenue": fact.val,
                    "form": fact.form
                })

        # Convert to DataFrame for analysis
        df = pd.DataFrame(revenue_data)
        df["date"] = pd.to_datetime(df["date"])
        df = df.sort_values("date")

        # Only keep 10-K and 10-Q reports
        df = df[df["form"].isin(["10-K", "10-Q"])]

        # Plot the revenue trend
        plt.figure(figsize=(12, 6))
        plt.plot(df["date"], df["revenue"] / 1e9)  # Convert to billions
        plt.title(f"Apple Inc. - {revenue_tag} Over Time")
        plt.xlabel("Date")
        plt.ylabel("Revenue (Billions USD)")
        plt.grid(True)
        plt.show()

Cross-Company Analysis with Frames

Use the frames API to compare the same financial concept across multiple companies.

import edgar_sec as ed
import pandas as pd
import matplotlib.pyplot as plt

# Initialize the API client
edgar = ed.EdgarAPI(cache_mode=True)

# Get assets for all companies for Q4 2022
assets_frame = edgar.get_frames(
    taxonomy="us-gaap",
    tag="Assets",
    unit="USD",
    period="CY2022Q4I"  # Calendar Year 2022, Q4, Instantaneous
)

print(f"Total companies reporting: {assets_frame.pts}")

# Extract the top 10 companies by assets
top_companies = sorted(assets_frame.frames, key=lambda x: x.val, reverse=True)[:10]

# Convert to DataFrame
df = pd.DataFrame([
    {"Company": company.entity_name, "Assets (Billions)": company.val / 1e9}
    for company in top_companies
])

# Create a horizontal bar chart
plt.figure(figsize=(12, 8))
plt.barh(df["Company"], df["Assets (Billions)"])
plt.title("Top 10 Companies by Total Assets (Q4 2022)")
plt.xlabel("Total Assets (Billions USD)")
plt.grid(True, axis="x")
plt.tight_layout()
plt.show()

Error Handling and Validation

Implement robust error handling to manage API limitations and issues.

import edgar_sec as ed
import httpx
from tenacity import retry, wait_fixed, stop_after_attempt

# Initialize the API client
edgar = ed.EdgarAPI(cache_mode=True)

# Function with enhanced error handling
@retry(wait=wait_fixed(2), stop=stop_after_attempt(3))
def get_company_data(cik):
    try:
        # Attempt to fetch data
        return edgar.get_submissions(central_index_key=cik)
    except ValueError as e:
        # Handle API-specific errors
        if "rate limit" in str(e).lower():
            print(f"Rate limit exceeded, retrying in 2 seconds...")
            raise  # Let retry handle this
        else:
            print(f"API Error: {e}")
            return None
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            print(f"Company with CIK {cik} not found")
            return None
        elif e.response.status_code == 429:
            print(f"Rate limit exceeded, retrying in 2 seconds...")
            raise  # Let retry handle this
        else:
            print(f"HTTP Error: {e}")
            return None
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

# Try with valid and invalid CIKs
companies = [
    "0000320193",  # Apple (valid)
    "0000123456",  # Invalid CIK
    "0000789019",  # Microsoft (valid)
]

for cik in companies:
    company = get_company_data(cik)
    if company:
        print(f"Successfully retrieved data for {company.name}")
    else:
        print(f"Failed to retrieve data for CIK {cik}")

Advanced Caching Configuration

Customize the caching behavior to optimize performance for your specific use case.

import edgar_sec as ed
from cachetools import TTLCache
import time

# Create a custom cache with specific size and TTL
custom_cache = TTLCache(maxsize=512, ttl=7200)  # 2 hour TTL, larger cache

# Access the internal attributes to customize the API (advanced usage)
edgar = ed.EdgarAPI(cache_mode=True)
edgar.cache = custom_cache  # Replace the default cache

# Measure performance difference with caching
start_time = time.time()
apple_facts = edgar.get_company_facts("0000320193")
first_request_time = time.time() - start_time

start_time = time.time()
apple_facts_cached = edgar.get_company_facts("0000320193")
cached_request_time = time.time() - start_time

print(f"First request time: {first_request_time:.2f} seconds")
print(f"Cached request time: {cached_request_time:.2f} seconds")
print(f"Speed improvement: {first_request_time / cached_request_time:.1f}x faster")