mirror of
https://github.com/makeplane/plane.git
synced 2026-06-14 03:30:00 +00:00
[WEB-7447] feat: migrate CE telemetry from OTLP traces to OTLP metrics (#9156)
* [WEB-7447] feat: migrate CE telemetry from OTLP traces to OTLP metrics Replace span-based tracing (tracer.py) with OTLP observable gauges, mirroring the approach already used in plane-ee. Key changes: - Add otlp_endpoints.py — shared gRPC/HTTP endpoint helpers - Add telemetry_metrics.py — push_instance_metrics task using MeterProvider + observable gauges (service name: plane-ce-api) - User count excludes bots (is_bot=False) - Page count excludes bot-owned private pages only - Domain derived from WEB_URL env var - Celery beat entry replaced with timedelta schedule + configurable METRICS_PUSH_INTERVAL_MINUTES (default 360 min) - Add explicit opentelemetry-exporter-otlp-proto-grpc dep - Delete tracer.py and telemetry.py (no longer needed) Co-authored-by: Plane AI <noreply@plane.so> * fix: address review comments on CE telemetry metrics - harden grpc_endpoint_from_url for scheme-less OTLP_ENDPOINT values (e.g. "telemetry.plane.so:4317") by prepending "//" before urlparse - fix WEB_URL domain extraction for scheme-less values with same approach - replace N+1 workspace count queries (6×N) with 6 batched annotate(Count) aggregation queries — reduces DB load significantly at WORKSPACE_METRICS_LIMIT - add deterministic ordering (order_by created_at) to workspace slice - harden METRICS_PUSH_INTERVAL_MINUTES env parsing with try/except guard and positive-value validation to avoid crash on malformed input Co-authored-by: Plane AI <noreply@plane.so> * fix: cap METRICS_PUSH_INTERVAL_MINUTES to prevent timedelta overflow Add upper-bound check (10_000_000 minutes) and catch OverflowError alongside ValueError so an arbitrarily large env value cannot crash worker startup via timedelta(minutes=...) OverflowError. Co-authored-by: Plane AI <noreply@plane.so> --------- Co-authored-by: Plane AI <noreply@plane.so>
This commit is contained in:
@@ -5,12 +5,13 @@
|
|||||||
# Python imports
|
# Python imports
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
# Third party imports
|
# Third party imports
|
||||||
from celery import Celery
|
from celery import Celery
|
||||||
from pythonjsonlogger.json import JsonFormatter
|
from pythonjsonlogger.json import JsonFormatter
|
||||||
from celery.signals import after_setup_logger, after_setup_task_logger
|
from celery.signals import after_setup_logger, after_setup_task_logger
|
||||||
from celery.schedules import crontab
|
from celery.schedules import crontab, schedule
|
||||||
|
|
||||||
# Module imports
|
# Module imports
|
||||||
from plane.settings.redis import redis_instance
|
from plane.settings.redis import redis_instance
|
||||||
@@ -20,6 +21,20 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "plane.settings.production")
|
|||||||
|
|
||||||
ri = redis_instance()
|
ri = redis_instance()
|
||||||
|
|
||||||
|
# Configurable metrics push interval (in minutes)
|
||||||
|
# Default: 360 (6 hours), set to 5 for development/testing
|
||||||
|
def _get_metrics_push_interval_minutes() -> int:
|
||||||
|
raw = os.environ.get("METRICS_PUSH_INTERVAL_MINUTES", "360")
|
||||||
|
try:
|
||||||
|
value = int(raw)
|
||||||
|
# Cap at 10,000,000 minutes to prevent timedelta(minutes=...) OverflowError
|
||||||
|
# on arbitrarily large inputs while still allowing multi-year intervals.
|
||||||
|
return value if 0 < value <= 10_000_000 else 360
|
||||||
|
except (ValueError, OverflowError):
|
||||||
|
return 360
|
||||||
|
|
||||||
|
METRICS_PUSH_INTERVAL_MINUTES = _get_metrics_push_interval_minutes()
|
||||||
|
|
||||||
app = Celery("plane")
|
app = Celery("plane")
|
||||||
|
|
||||||
# Using a string here means the worker will not have to
|
# Using a string here means the worker will not have to
|
||||||
@@ -32,9 +47,9 @@ app.conf.beat_schedule = {
|
|||||||
"task": "plane.bgtasks.email_notification_task.stack_email_notification",
|
"task": "plane.bgtasks.email_notification_task.stack_email_notification",
|
||||||
"schedule": crontab(minute="*/5"), # Every 5 minutes
|
"schedule": crontab(minute="*/5"), # Every 5 minutes
|
||||||
},
|
},
|
||||||
"run-every-6-hours-for-instance-trace": {
|
"push-instance-metrics": {
|
||||||
"task": "plane.license.bgtasks.tracer.instance_traces",
|
"task": "plane.license.bgtasks.telemetry_metrics.push_instance_metrics",
|
||||||
"schedule": crontab(hour="*/6", minute=0), # Every 6 hours
|
"schedule": schedule(run_every=timedelta(minutes=METRICS_PUSH_INTERVAL_MINUTES)),
|
||||||
},
|
},
|
||||||
# Occurs once every day
|
# Occurs once every day
|
||||||
"check-every-day-to-delete-hard-delete": {
|
"check-every-day-to-delete-hard-delete": {
|
||||||
|
|||||||
@@ -0,0 +1,381 @@
|
|||||||
|
# Copyright (c) 2023-present Plane Software, Inc. and contributors
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
# See the LICENSE file for details.
|
||||||
|
|
||||||
|
# Python imports
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
# Third party imports
|
||||||
|
from celery import shared_task
|
||||||
|
from django.db.models import Count
|
||||||
|
from opentelemetry import metrics
|
||||||
|
from opentelemetry.sdk.metrics import MeterProvider
|
||||||
|
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
||||||
|
from opentelemetry.sdk.resources import Resource
|
||||||
|
|
||||||
|
# Module imports
|
||||||
|
from plane.utils.otlp_endpoints import get_otlp_grpc_endpoint, get_otlp_http_metrics_url
|
||||||
|
from plane.license.models import Instance
|
||||||
|
from plane.db.models import (
|
||||||
|
User,
|
||||||
|
Workspace,
|
||||||
|
Project,
|
||||||
|
Issue,
|
||||||
|
Module,
|
||||||
|
Cycle,
|
||||||
|
CycleIssue,
|
||||||
|
ModuleIssue,
|
||||||
|
Page,
|
||||||
|
WorkspaceMember,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
WORKSPACE_METRICS_LIMIT = 1000
|
||||||
|
FLUSH_TIMEOUT_MILLIS = 30000
|
||||||
|
EXPORT_INTERVAL_MILLIS = 20000
|
||||||
|
|
||||||
|
|
||||||
|
def _create_otlp_metric_exporter():
|
||||||
|
"""
|
||||||
|
Create OTLP metric exporter based on OTLP_METRICS_PROTOCOL (http or grpc).
|
||||||
|
Uses shared endpoint helpers so metrics and traces target the same collector.
|
||||||
|
Default is grpc; override with OTLP_METRICS_PROTOCOL=http if needed.
|
||||||
|
"""
|
||||||
|
protocol = (os.environ.get("OTLP_METRICS_PROTOCOL") or "grpc").strip().lower()
|
||||||
|
|
||||||
|
if protocol == "grpc":
|
||||||
|
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
|
||||||
|
OTLPMetricExporter as GrpcOTLPMetricExporter,
|
||||||
|
)
|
||||||
|
|
||||||
|
grpc_endpoint = get_otlp_grpc_endpoint()
|
||||||
|
insecure = os.environ.get("OTEL_EXPORTER_OTLP_METRICS_INSECURE", "").lower() == "true"
|
||||||
|
return GrpcOTLPMetricExporter(endpoint=grpc_endpoint, insecure=insecure)
|
||||||
|
|
||||||
|
# HTTP fallback
|
||||||
|
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
|
||||||
|
OTLPMetricExporter as HttpOTLPMetricExporter,
|
||||||
|
)
|
||||||
|
|
||||||
|
return HttpOTLPMetricExporter(endpoint=get_otlp_http_metrics_url())
|
||||||
|
|
||||||
|
|
||||||
|
def _collect_and_push_metrics() -> None:
|
||||||
|
"""
|
||||||
|
Collect instance metrics and push them to OTEL collector.
|
||||||
|
|
||||||
|
Uses OTEL metrics SDK to push gauge metrics directly to the collector,
|
||||||
|
replacing the previous span-based tracing approach.
|
||||||
|
"""
|
||||||
|
# Check if the instance is registered
|
||||||
|
instance = Instance.objects.first()
|
||||||
|
|
||||||
|
if instance is None:
|
||||||
|
logger.debug("No instance registered, skipping metrics push")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not instance.is_telemetry_enabled:
|
||||||
|
logger.debug("Telemetry disabled, skipping metrics push")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Configure OTEL metrics (gRPC default, or HTTP if OTLP_METRICS_PROTOCOL=http)
|
||||||
|
protocol = (os.environ.get("OTLP_METRICS_PROTOCOL") or "grpc").strip().lower()
|
||||||
|
export_endpoint = get_otlp_grpc_endpoint() if protocol == "grpc" else get_otlp_http_metrics_url()
|
||||||
|
|
||||||
|
service_name = os.environ.get("SERVICE_NAME", "plane-ce-api")
|
||||||
|
|
||||||
|
# Create resource with instance identification for the collector
|
||||||
|
resource = Resource.create({
|
||||||
|
"service.name": service_name,
|
||||||
|
"instance_id": str(instance.instance_id or ""),
|
||||||
|
"plane.instance.type": "self-hosted",
|
||||||
|
})
|
||||||
|
|
||||||
|
# Configure the OTLP metric exporter (HTTP or gRPC)
|
||||||
|
logger.info(f"Configuring OTLP exporter: protocol={protocol}, endpoint={export_endpoint}")
|
||||||
|
exporter = _create_otlp_metric_exporter()
|
||||||
|
reader = PeriodicExportingMetricReader(
|
||||||
|
exporter,
|
||||||
|
export_interval_millis=EXPORT_INTERVAL_MILLIS,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a new MeterProvider per execution. Gauges use callbacks that capture
|
||||||
|
# current DB counts, so we need fresh meters each run. provider.shutdown() in
|
||||||
|
# finally ensures clean teardown. For a 6-hour periodic task, this overhead is acceptable.
|
||||||
|
provider = MeterProvider(resource=resource, metric_readers=[reader])
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get a meter
|
||||||
|
meter = provider.get_meter(__name__)
|
||||||
|
|
||||||
|
# Collect instance-level counts
|
||||||
|
user_count = User.objects.filter(is_bot=False).count()
|
||||||
|
workspace_count = Workspace.objects.count()
|
||||||
|
project_count = Project.objects.count()
|
||||||
|
issue_count = Issue.objects.count()
|
||||||
|
module_count = Module.objects.count()
|
||||||
|
cycle_count = Cycle.objects.count()
|
||||||
|
cycle_issue_count = CycleIssue.objects.count()
|
||||||
|
module_issue_count = ModuleIssue.objects.count()
|
||||||
|
page_count = Page.objects.exclude(owned_by__is_bot=True, access=1).count()
|
||||||
|
|
||||||
|
# Derive domain from WEB_URL env var (e.g. https://plane.acmecorp.com -> plane.acmecorp.com).
|
||||||
|
# Prepend "//" for scheme-less values (e.g. "plane.acmecorp.com") so urlparse
|
||||||
|
# populates netloc correctly instead of treating the host as a path component.
|
||||||
|
web_url = os.environ.get("WEB_URL", "")
|
||||||
|
if web_url and "://" not in web_url:
|
||||||
|
web_url = "//" + web_url
|
||||||
|
domain = urlparse(web_url).netloc if web_url else ""
|
||||||
|
|
||||||
|
# Common attributes for all instance-level metrics
|
||||||
|
instance_attrs = {
|
||||||
|
"instance_id": str(instance.instance_id or ""),
|
||||||
|
"instance_name": str(instance.instance_name or ""),
|
||||||
|
"current_version": str(instance.current_version or ""),
|
||||||
|
"latest_version": str(instance.latest_version or ""),
|
||||||
|
"edition": str(instance.edition or ""),
|
||||||
|
"domain": domain,
|
||||||
|
"is_verified": str(instance.is_verified).lower(),
|
||||||
|
"is_setup_done": str(instance.is_setup_done).lower(),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Create gauge callbacks for instance-level metrics
|
||||||
|
def users_callback(_options):
|
||||||
|
yield metrics.Observation(user_count, instance_attrs)
|
||||||
|
|
||||||
|
def workspaces_callback(_options):
|
||||||
|
yield metrics.Observation(workspace_count, instance_attrs)
|
||||||
|
|
||||||
|
def projects_callback(_options):
|
||||||
|
yield metrics.Observation(project_count, instance_attrs)
|
||||||
|
|
||||||
|
def issues_callback(_options):
|
||||||
|
yield metrics.Observation(issue_count, instance_attrs)
|
||||||
|
|
||||||
|
def modules_callback(_options):
|
||||||
|
yield metrics.Observation(module_count, instance_attrs)
|
||||||
|
|
||||||
|
def cycles_callback(_options):
|
||||||
|
yield metrics.Observation(cycle_count, instance_attrs)
|
||||||
|
|
||||||
|
def cycle_issues_callback(_options):
|
||||||
|
yield metrics.Observation(cycle_issue_count, instance_attrs)
|
||||||
|
|
||||||
|
def module_issues_callback(_options):
|
||||||
|
yield metrics.Observation(module_issue_count, instance_attrs)
|
||||||
|
|
||||||
|
def pages_callback(_options):
|
||||||
|
yield metrics.Observation(page_count, instance_attrs)
|
||||||
|
|
||||||
|
# Register observable gauges for instance metrics
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_instance_users_total",
|
||||||
|
description="Total number of users in the Plane instance",
|
||||||
|
callbacks=[users_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_instance_workspaces_total",
|
||||||
|
description="Total number of workspaces",
|
||||||
|
callbacks=[workspaces_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_instance_projects_total",
|
||||||
|
description="Total number of projects across all workspaces",
|
||||||
|
callbacks=[projects_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_instance_issues_total",
|
||||||
|
description="Total number of issues across all projects",
|
||||||
|
callbacks=[issues_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_instance_modules_total",
|
||||||
|
description="Total number of modules",
|
||||||
|
callbacks=[modules_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_instance_cycles_total",
|
||||||
|
description="Total number of cycles",
|
||||||
|
callbacks=[cycles_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_instance_cycle_issues_total",
|
||||||
|
description="Total number of issues in cycles",
|
||||||
|
callbacks=[cycle_issues_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_instance_module_issues_total",
|
||||||
|
description="Total number of issues in modules",
|
||||||
|
callbacks=[module_issues_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_instance_pages_total",
|
||||||
|
description="Total number of pages",
|
||||||
|
callbacks=[pages_callback],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Collect workspace-level metrics (limited to WORKSPACE_METRICS_LIMIT).
|
||||||
|
# Fetch workspaces in a deterministic order so the slice is stable across runs.
|
||||||
|
# Counts are batched into 6 aggregation queries instead of 6×N per-workspace
|
||||||
|
# queries (avoids N+1 at scale when WORKSPACE_METRICS_LIMIT is large).
|
||||||
|
instance_id_str = str(instance.instance_id or "")
|
||||||
|
workspaces = list(Workspace.objects.order_by("created_at")[:WORKSPACE_METRICS_LIMIT])
|
||||||
|
workspace_ids = [ws.id for ws in workspaces]
|
||||||
|
|
||||||
|
project_counts = dict(
|
||||||
|
Project.objects.filter(workspace_id__in=workspace_ids)
|
||||||
|
.values("workspace_id")
|
||||||
|
.annotate(count=Count("id"))
|
||||||
|
.values_list("workspace_id", "count")
|
||||||
|
)
|
||||||
|
issue_counts = dict(
|
||||||
|
Issue.objects.filter(workspace_id__in=workspace_ids)
|
||||||
|
.values("workspace_id")
|
||||||
|
.annotate(count=Count("id"))
|
||||||
|
.values_list("workspace_id", "count")
|
||||||
|
)
|
||||||
|
module_counts = dict(
|
||||||
|
Module.objects.filter(workspace_id__in=workspace_ids)
|
||||||
|
.values("workspace_id")
|
||||||
|
.annotate(count=Count("id"))
|
||||||
|
.values_list("workspace_id", "count")
|
||||||
|
)
|
||||||
|
cycle_counts = dict(
|
||||||
|
Cycle.objects.filter(workspace_id__in=workspace_ids)
|
||||||
|
.values("workspace_id")
|
||||||
|
.annotate(count=Count("id"))
|
||||||
|
.values_list("workspace_id", "count")
|
||||||
|
)
|
||||||
|
member_counts = dict(
|
||||||
|
WorkspaceMember.objects.filter(workspace_id__in=workspace_ids)
|
||||||
|
.values("workspace_id")
|
||||||
|
.annotate(count=Count("id"))
|
||||||
|
.values_list("workspace_id", "count")
|
||||||
|
)
|
||||||
|
page_counts = dict(
|
||||||
|
Page.objects.filter(workspace_id__in=workspace_ids)
|
||||||
|
.exclude(owned_by__is_bot=True, access=1)
|
||||||
|
.values("workspace_id")
|
||||||
|
.annotate(count=Count("id"))
|
||||||
|
.values_list("workspace_id", "count")
|
||||||
|
)
|
||||||
|
|
||||||
|
workspace_metrics = []
|
||||||
|
for workspace in workspaces:
|
||||||
|
ws_id = workspace.id
|
||||||
|
workspace_metrics.append({
|
||||||
|
"instance_id": instance_id_str,
|
||||||
|
"workspace_id": str(ws_id),
|
||||||
|
"workspace_slug": str(workspace.slug),
|
||||||
|
"project_count": project_counts.get(ws_id, 0),
|
||||||
|
"issue_count": issue_counts.get(ws_id, 0),
|
||||||
|
"module_count": module_counts.get(ws_id, 0),
|
||||||
|
"cycle_count": cycle_counts.get(ws_id, 0),
|
||||||
|
"member_count": member_counts.get(ws_id, 0),
|
||||||
|
"page_count": page_counts.get(ws_id, 0),
|
||||||
|
})
|
||||||
|
|
||||||
|
def _ws_attrs(ws: dict) -> dict:
|
||||||
|
return {
|
||||||
|
"workspace_id": ws["workspace_id"],
|
||||||
|
"workspace_slug": ws["workspace_slug"],
|
||||||
|
"instance_id": ws["instance_id"],
|
||||||
|
}
|
||||||
|
|
||||||
|
# Create callbacks for workspace-level metrics
|
||||||
|
def ws_projects_callback(_options):
|
||||||
|
for ws in workspace_metrics:
|
||||||
|
yield metrics.Observation(ws["project_count"], _ws_attrs(ws))
|
||||||
|
|
||||||
|
def ws_issues_callback(_options):
|
||||||
|
for ws in workspace_metrics:
|
||||||
|
yield metrics.Observation(ws["issue_count"], _ws_attrs(ws))
|
||||||
|
|
||||||
|
def ws_modules_callback(_options):
|
||||||
|
for ws in workspace_metrics:
|
||||||
|
yield metrics.Observation(ws["module_count"], _ws_attrs(ws))
|
||||||
|
|
||||||
|
def ws_cycles_callback(_options):
|
||||||
|
for ws in workspace_metrics:
|
||||||
|
yield metrics.Observation(ws["cycle_count"], _ws_attrs(ws))
|
||||||
|
|
||||||
|
def ws_members_callback(_options):
|
||||||
|
for ws in workspace_metrics:
|
||||||
|
yield metrics.Observation(ws["member_count"], _ws_attrs(ws))
|
||||||
|
|
||||||
|
def ws_pages_callback(_options):
|
||||||
|
for ws in workspace_metrics:
|
||||||
|
yield metrics.Observation(ws["page_count"], _ws_attrs(ws))
|
||||||
|
|
||||||
|
# Register observable gauges for workspace metrics
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_workspace_projects_total",
|
||||||
|
description="Number of projects per workspace",
|
||||||
|
callbacks=[ws_projects_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_workspace_issues_total",
|
||||||
|
description="Number of issues per workspace",
|
||||||
|
callbacks=[ws_issues_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_workspace_modules_total",
|
||||||
|
description="Number of modules per workspace",
|
||||||
|
callbacks=[ws_modules_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_workspace_cycles_total",
|
||||||
|
description="Number of cycles per workspace",
|
||||||
|
callbacks=[ws_cycles_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_workspace_members_total",
|
||||||
|
description="Number of members per workspace",
|
||||||
|
callbacks=[ws_members_callback],
|
||||||
|
)
|
||||||
|
meter.create_observable_gauge(
|
||||||
|
name="plane_workspace_pages_total",
|
||||||
|
description="Number of pages per workspace",
|
||||||
|
callbacks=[ws_pages_callback],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Force a synchronous flush to ensure all metrics are exported
|
||||||
|
# force_flush() blocks until all metrics are exported or timeout is reached
|
||||||
|
flush_success = provider.force_flush(timeout_millis=FLUSH_TIMEOUT_MILLIS)
|
||||||
|
|
||||||
|
if flush_success:
|
||||||
|
logger.info(
|
||||||
|
f"Successfully pushed metrics to OTEL collector at {export_endpoint} "
|
||||||
|
f"for instance {instance.instance_id}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
f"Metrics flush timed out for instance {instance.instance_id}, "
|
||||||
|
f"some metrics may not have been exported"
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"Error pushing metrics to OTEL collector: {e}")
|
||||||
|
# Don't re-raise: allow task to complete gracefully so it retries on next scheduled run
|
||||||
|
finally:
|
||||||
|
# Shutdown the provider to clean up resources
|
||||||
|
provider.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task
|
||||||
|
def push_instance_metrics():
|
||||||
|
"""
|
||||||
|
Celery task to push instance metrics to OTEL collector.
|
||||||
|
|
||||||
|
Replaces the previous span-based tracing approach with OTLP metrics gauges.
|
||||||
|
Scheduled to run every 6 hours via Celery beat.
|
||||||
|
"""
|
||||||
|
logger.debug("Starting push_instance_metrics task")
|
||||||
|
try:
|
||||||
|
_collect_and_push_metrics()
|
||||||
|
logger.debug("Completed push_instance_metrics task")
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"Failed to push instance metrics: {e}")
|
||||||
@@ -1,105 +0,0 @@
|
|||||||
# Copyright (c) 2023-present Plane Software, Inc. and contributors
|
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
# See the LICENSE file for details.
|
|
||||||
|
|
||||||
# Third party imports
|
|
||||||
from celery import shared_task
|
|
||||||
from opentelemetry import trace
|
|
||||||
|
|
||||||
# Module imports
|
|
||||||
from plane.license.models import Instance
|
|
||||||
from plane.db.models import (
|
|
||||||
User,
|
|
||||||
Workspace,
|
|
||||||
Project,
|
|
||||||
Issue,
|
|
||||||
Module,
|
|
||||||
Cycle,
|
|
||||||
CycleIssue,
|
|
||||||
ModuleIssue,
|
|
||||||
Page,
|
|
||||||
WorkspaceMember,
|
|
||||||
)
|
|
||||||
from plane.utils.telemetry import init_tracer, shutdown_tracer
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
|
||||||
def instance_traces():
|
|
||||||
try:
|
|
||||||
init_tracer()
|
|
||||||
# Check if the instance is registered
|
|
||||||
instance = Instance.objects.first()
|
|
||||||
|
|
||||||
# If instance is None then return
|
|
||||||
if instance is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
if instance.is_telemetry_enabled:
|
|
||||||
# Get the tracer
|
|
||||||
tracer = trace.get_tracer(__name__)
|
|
||||||
# Instance details
|
|
||||||
with tracer.start_as_current_span("instance_details") as span:
|
|
||||||
# Count of all models
|
|
||||||
workspace_count = Workspace.objects.count()
|
|
||||||
user_count = User.objects.count()
|
|
||||||
project_count = Project.objects.count()
|
|
||||||
issue_count = Issue.objects.count()
|
|
||||||
module_count = Module.objects.count()
|
|
||||||
cycle_count = Cycle.objects.count()
|
|
||||||
cycle_issue_count = CycleIssue.objects.count()
|
|
||||||
module_issue_count = ModuleIssue.objects.count()
|
|
||||||
page_count = Page.objects.count()
|
|
||||||
|
|
||||||
# Set span attributes
|
|
||||||
span.set_attribute("instance_id", instance.instance_id)
|
|
||||||
span.set_attribute("instance_name", instance.instance_name)
|
|
||||||
span.set_attribute("current_version", instance.current_version)
|
|
||||||
span.set_attribute("latest_version", instance.latest_version)
|
|
||||||
span.set_attribute("is_telemetry_enabled", instance.is_telemetry_enabled)
|
|
||||||
span.set_attribute("is_support_required", instance.is_support_required)
|
|
||||||
span.set_attribute("is_setup_done", instance.is_setup_done)
|
|
||||||
span.set_attribute("is_signup_screen_visited", instance.is_signup_screen_visited)
|
|
||||||
span.set_attribute("is_verified", instance.is_verified)
|
|
||||||
span.set_attribute("edition", instance.edition)
|
|
||||||
span.set_attribute("domain", instance.domain)
|
|
||||||
span.set_attribute("is_test", instance.is_test)
|
|
||||||
span.set_attribute("user_count", user_count)
|
|
||||||
span.set_attribute("workspace_count", workspace_count)
|
|
||||||
span.set_attribute("project_count", project_count)
|
|
||||||
span.set_attribute("issue_count", issue_count)
|
|
||||||
span.set_attribute("module_count", module_count)
|
|
||||||
span.set_attribute("cycle_count", cycle_count)
|
|
||||||
span.set_attribute("cycle_issue_count", cycle_issue_count)
|
|
||||||
span.set_attribute("module_issue_count", module_issue_count)
|
|
||||||
span.set_attribute("page_count", page_count)
|
|
||||||
|
|
||||||
# Workspace details
|
|
||||||
for workspace in Workspace.objects.all():
|
|
||||||
# Count of all models
|
|
||||||
project_count = Project.objects.filter(workspace=workspace).count()
|
|
||||||
issue_count = Issue.objects.filter(workspace=workspace).count()
|
|
||||||
module_count = Module.objects.filter(workspace=workspace).count()
|
|
||||||
cycle_count = Cycle.objects.filter(workspace=workspace).count()
|
|
||||||
cycle_issue_count = CycleIssue.objects.filter(workspace=workspace).count()
|
|
||||||
module_issue_count = ModuleIssue.objects.filter(workspace=workspace).count()
|
|
||||||
page_count = Page.objects.filter(workspace=workspace).count()
|
|
||||||
member_count = WorkspaceMember.objects.filter(workspace=workspace).count()
|
|
||||||
|
|
||||||
# Set span attributes
|
|
||||||
with tracer.start_as_current_span("workspace_details") as span:
|
|
||||||
span.set_attribute("instance_id", instance.instance_id)
|
|
||||||
span.set_attribute("workspace_id", str(workspace.id))
|
|
||||||
span.set_attribute("workspace_slug", workspace.slug)
|
|
||||||
span.set_attribute("project_count", project_count)
|
|
||||||
span.set_attribute("issue_count", issue_count)
|
|
||||||
span.set_attribute("module_count", module_count)
|
|
||||||
span.set_attribute("cycle_count", cycle_count)
|
|
||||||
span.set_attribute("cycle_issue_count", cycle_issue_count)
|
|
||||||
span.set_attribute("module_issue_count", module_issue_count)
|
|
||||||
span.set_attribute("page_count", page_count)
|
|
||||||
span.set_attribute("member_count", member_count)
|
|
||||||
|
|
||||||
return
|
|
||||||
finally:
|
|
||||||
# Shutdown the tracer
|
|
||||||
shutdown_tracer()
|
|
||||||
@@ -15,7 +15,7 @@ from django.utils import timezone
|
|||||||
|
|
||||||
# Module imports
|
# Module imports
|
||||||
from plane.license.models import Instance, InstanceEdition
|
from plane.license.models import Instance, InstanceEdition
|
||||||
from plane.license.bgtasks.tracer import instance_traces
|
from plane.license.bgtasks.telemetry_metrics import push_instance_metrics
|
||||||
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
@@ -86,7 +86,7 @@ class Command(BaseCommand):
|
|||||||
instance.edition = InstanceEdition.PLANE_COMMUNITY.value
|
instance.edition = InstanceEdition.PLANE_COMMUNITY.value
|
||||||
instance.save()
|
instance.save()
|
||||||
|
|
||||||
# Call the instance traces task
|
# Push instance metrics on registration
|
||||||
instance_traces.delay()
|
push_instance_metrics.delay()
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -321,7 +321,7 @@ CELERY_IMPORTS = (
|
|||||||
"plane.bgtasks.file_asset_task",
|
"plane.bgtasks.file_asset_task",
|
||||||
"plane.bgtasks.email_notification_task",
|
"plane.bgtasks.email_notification_task",
|
||||||
"plane.bgtasks.cleanup_task",
|
"plane.bgtasks.cleanup_task",
|
||||||
"plane.license.bgtasks.tracer",
|
"plane.license.bgtasks.telemetry_metrics",
|
||||||
# management tasks
|
# management tasks
|
||||||
"plane.bgtasks.dummy_data_task",
|
"plane.bgtasks.dummy_data_task",
|
||||||
# issue version tasks
|
# issue version tasks
|
||||||
|
|||||||
@@ -0,0 +1,59 @@
|
|||||||
|
# Copyright (c) 2023-present Plane Software, Inc. and contributors
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
# See the LICENSE file for details.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Shared OTLP endpoint helpers so metrics and traces use the same collector
|
||||||
|
when both are enabled. One URL (OTLP_ENDPOINT) is enough: same as traces
|
||||||
|
(e.g. https://telemetry.plane.so or https://telemetry.plane.town behind
|
||||||
|
nginx ingress with gRPC backend).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
# When no port in URL: https -> 443 (ingress), http -> 4317 (OTLP gRPC default)
|
||||||
|
OTLP_GRPC_DEFAULT_PORT = "4317"
|
||||||
|
HTTPS_DEFAULT_PORT = "443"
|
||||||
|
|
||||||
|
_DEFAULT_OTLP_ENDPOINT = "https://telemetry.plane.so"
|
||||||
|
|
||||||
|
|
||||||
|
def grpc_endpoint_from_url(url: str) -> str:
|
||||||
|
"""
|
||||||
|
Derive gRPC host:port from OTLP_ENDPOINT URL.
|
||||||
|
- https://telemetry.plane.so -> telemetry.plane.so:443 (nginx ingress)
|
||||||
|
- https://telemetry.plane.town -> telemetry.plane.town:443 (dev)
|
||||||
|
- telemetry.plane.so:4317 -> telemetry.plane.so:4317 (scheme-less with port)
|
||||||
|
- telemetry.plane.so -> telemetry.plane.so:4317 (scheme-less, default gRPC port)
|
||||||
|
- Explicit port in URL is always preserved.
|
||||||
|
"""
|
||||||
|
# urlparse needs a scheme to correctly populate hostname/netloc.
|
||||||
|
# Scheme-less values like "host:port" are misread as scheme="host", path="port".
|
||||||
|
if "://" not in url:
|
||||||
|
url = "//" + url
|
||||||
|
parsed = urlparse(url)
|
||||||
|
host = parsed.hostname or "telemetry.plane.so"
|
||||||
|
if parsed.port is not None:
|
||||||
|
port = str(parsed.port)
|
||||||
|
elif parsed.scheme == "https":
|
||||||
|
port = HTTPS_DEFAULT_PORT
|
||||||
|
else:
|
||||||
|
port = OTLP_GRPC_DEFAULT_PORT
|
||||||
|
return f"{host}:{port}"
|
||||||
|
|
||||||
|
|
||||||
|
def get_otlp_grpc_endpoint() -> str:
|
||||||
|
"""
|
||||||
|
Return the gRPC endpoint (host:port) for OTLP traces and metrics.
|
||||||
|
Derived from OTLP_ENDPOINT so the same URL works for both (e.g. collector
|
||||||
|
behind nginx ingress with gRPC backend on 443).
|
||||||
|
"""
|
||||||
|
base = os.environ.get("OTLP_ENDPOINT", _DEFAULT_OTLP_ENDPOINT)
|
||||||
|
return grpc_endpoint_from_url(base)
|
||||||
|
|
||||||
|
|
||||||
|
def get_otlp_http_metrics_url() -> str:
|
||||||
|
"""Return the HTTP URL for OTLP metrics (OTLP_ENDPOINT + /v1/metrics)."""
|
||||||
|
base = os.environ.get("OTLP_ENDPOINT", _DEFAULT_OTLP_ENDPOINT)
|
||||||
|
return f"{base.rstrip('/')}/v1/metrics"
|
||||||
@@ -1,62 +0,0 @@
|
|||||||
# Copyright (c) 2023-present Plane Software, Inc. and contributors
|
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
# See the LICENSE file for details.
|
|
||||||
|
|
||||||
# Python imports
|
|
||||||
import os
|
|
||||||
import atexit
|
|
||||||
|
|
||||||
# Third party imports
|
|
||||||
from opentelemetry import trace
|
|
||||||
from opentelemetry.sdk.trace import TracerProvider
|
|
||||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
|
||||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
|
||||||
from opentelemetry.sdk.resources import Resource
|
|
||||||
from opentelemetry.instrumentation.django import DjangoInstrumentor
|
|
||||||
|
|
||||||
# Global variable to track initialization
|
|
||||||
_TRACER_PROVIDER = None
|
|
||||||
|
|
||||||
|
|
||||||
def init_tracer():
|
|
||||||
"""Initialize OpenTelemetry with proper shutdown handling"""
|
|
||||||
global _TRACER_PROVIDER
|
|
||||||
|
|
||||||
# If already initialized, return existing provider
|
|
||||||
if _TRACER_PROVIDER is not None:
|
|
||||||
return _TRACER_PROVIDER
|
|
||||||
|
|
||||||
# Configure the tracer provider
|
|
||||||
service_name = os.environ.get("SERVICE_NAME", "plane-ce-api")
|
|
||||||
resource = Resource.create({"service.name": service_name})
|
|
||||||
tracer_provider = TracerProvider(resource=resource)
|
|
||||||
|
|
||||||
# Set as global tracer provider
|
|
||||||
trace.set_tracer_provider(tracer_provider)
|
|
||||||
|
|
||||||
# Configure the OTLP exporter
|
|
||||||
otel_endpoint = os.environ.get("OTLP_ENDPOINT", "https://telemetry.plane.so")
|
|
||||||
otlp_exporter = OTLPSpanExporter(endpoint=otel_endpoint)
|
|
||||||
span_processor = BatchSpanProcessor(otlp_exporter)
|
|
||||||
tracer_provider.add_span_processor(span_processor)
|
|
||||||
|
|
||||||
# Initialize Django instrumentation
|
|
||||||
DjangoInstrumentor().instrument()
|
|
||||||
|
|
||||||
# Store provider globally
|
|
||||||
_TRACER_PROVIDER = tracer_provider
|
|
||||||
|
|
||||||
# Register shutdown handler
|
|
||||||
atexit.register(shutdown_tracer)
|
|
||||||
|
|
||||||
return tracer_provider
|
|
||||||
|
|
||||||
|
|
||||||
def shutdown_tracer():
|
|
||||||
"""Shutdown OpenTelemetry tracers and processors"""
|
|
||||||
global _TRACER_PROVIDER
|
|
||||||
|
|
||||||
if _TRACER_PROVIDER is not None:
|
|
||||||
if hasattr(_TRACER_PROVIDER, "shutdown"):
|
|
||||||
_TRACER_PROVIDER.shutdown()
|
|
||||||
_TRACER_PROVIDER = None
|
|
||||||
@@ -67,6 +67,7 @@ opentelemetry-api==1.28.1
|
|||||||
opentelemetry-sdk==1.28.1
|
opentelemetry-sdk==1.28.1
|
||||||
opentelemetry-instrumentation-django==0.49b1
|
opentelemetry-instrumentation-django==0.49b1
|
||||||
opentelemetry-exporter-otlp==1.28.1
|
opentelemetry-exporter-otlp==1.28.1
|
||||||
|
opentelemetry-exporter-otlp-proto-grpc==1.28.1
|
||||||
# OpenAPI Specification
|
# OpenAPI Specification
|
||||||
drf-spectacular==0.28.0
|
drf-spectacular==0.28.0
|
||||||
# html sanitizer
|
# html sanitizer
|
||||||
|
|||||||
Reference in New Issue
Block a user