💡 Time-Series Data Examples

Practical code examples for working with the WIA-DATA-014 standard across different platforms and languages.

1. IoT Temperature Monitoring

Python

Collecting and storing temperature data from IoT sensors.

from influxdb_client import InfluxDBClient, Point
from datetime import datetime

# Initialize client
client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="wia")
write_api = client.write_api()

# Write temperature data
point = Point("temperature") \
    .tag("location", "warehouse-a") \
    .tag("sensor_id", "DHT22-001") \
    .field("value", 23.5) \
    .field("humidity", 65.2) \
    .time(datetime.utcnow())

write_api.write(bucket="sensors", record=point)

# Query recent data
query = '''
from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r.location == "warehouse-a")
  |> aggregateWindow(every: 5m, fn: mean)
'''

query_api = client.query_api()
result = query_api.query(query)

for table in result:
    for record in table.records:
        print(f"{record.get_time()}: {record.get_value()}°C")

2. Server CPU Monitoring

Node.js

Monitor server CPU usage and detect anomalies.

const { InfluxDB, Point } = require('@influxdata/influxdb-client');
const os = require('os');

const client = new InfluxDB({
  url: 'http://localhost:8086',
  token: 'your-token'
});

const writeApi = client.getWriteApi('wia', 'monitoring');

// Collect CPU metrics every 10 seconds
setInterval(() => {
  const cpuUsage = os.loadavg()[0] * 10; // Simplified calculation

  const point = new Point('cpu')
    .tag('host', os.hostname())
    .tag('platform', os.platform())
    .floatField('usage_percent', cpuUsage)
    .floatField('cores', os.cpus().length);

  writeApi.writePoint(point);

  // Flush every 5 points
  writeApi.flush();
}, 10000);

// Query and detect high CPU usage
async function detectHighCPU() {
  const queryApi = client.getQueryApi('wia');

  const query = `
    from(bucket: "monitoring")
      |> range(start: -15m)
      |> filter(fn: (r) => r._measurement == "cpu")
      |> filter(fn: (r) => r._field == "usage_percent")
      |> filter(fn: (r) => r._value > 80.0)
  `;

  const results = await queryApi.collectRows(query);

  if (results.length > 0) {
    console.log('⚠️ High CPU detected:', results);
    // Trigger alert
  }
}

setInterval(detectHighCPU, 60000); // Check every minute

3. Stock Price Analysis

Python

Store and analyze financial time-series data.

import pandas as pd
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta

client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="wia")

# Write stock prices
def write_stock_data(symbol, price, volume):
    write_api = client.write_api()
    point = Point("stock_price") \
        .tag("symbol", symbol) \
        .tag("exchange", "NYSE") \
        .field("price", price) \
        .field("volume", volume) \
        .time(datetime.utcnow())
    write_api.write(bucket="finance", record=point)

# Calculate moving average
def get_moving_average(symbol, days=7):
    query = f'''
    from(bucket: "finance")
      |> range(start: -{days}d)
      |> filter(fn: (r) => r._measurement == "stock_price")
      |> filter(fn: (r) => r.symbol == "{symbol}")
      |> filter(fn: (r) => r._field == "price")
      |> movingAverage(n: 10)
    '''

    query_api = client.query_api()
    df = query_api.query_data_frame(query)
    return df

# Detect price anomalies using z-score
def detect_price_anomalies(symbol, threshold=3.0):
    query = f'''
    import "contrib/anaisdg/anomalydetection"

    from(bucket: "finance")
      |> range(start: -30d)
      |> filter(fn: (r) => r._measurement == "stock_price")
      |> filter(fn: (r) => r.symbol == "{symbol}")
      |> anomalydetection.mad(threshold: {threshold})
    '''

    query_api = client.query_api()
    result = query_api.query(query)

    anomalies = []
    for table in result:
        for record in table.records:
            if record.get_value() > 0:  # Anomaly detected
                anomalies.append({
                    'time': record.get_time(),
                    'value': record.get_field('price')
                })

    return anomalies

4. Real-Time Network Monitoring

Go

Monitor network traffic with TimescaleDB.

package main

import (
    "database/sql"
    "fmt"
    "log"
    "time"

    _ "github.com/lib/pq"
)

type NetworkMetric struct {
    Timestamp   time.Time
    Interface   string
    BytesIn     int64
    BytesOut    int64
    PacketsIn   int64
    PacketsOut  int64
}

func main() {
    connStr := "postgresql://user:pass@localhost/network_metrics?sslmode=disable"
    db, err := sql.Open("postgres", connStr)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    // Create hypertable
    createTable := `
    CREATE TABLE IF NOT EXISTS network_traffic (
        time        TIMESTAMPTZ NOT NULL,
        interface   TEXT NOT NULL,
        bytes_in    BIGINT,
        bytes_out   BIGINT,
        packets_in  BIGINT,
        packets_out BIGINT
    );

    SELECT create_hypertable('network_traffic', 'time', if_not_exists => TRUE);
    `
    _, err = db.Exec(createTable)
    if err != nil {
        log.Fatal(err)
    }

    // Insert metrics
    metric := NetworkMetric{
        Timestamp:   time.Now(),
        Interface:   "eth0",
        BytesIn:     1024000,
        BytesOut:    512000,
        PacketsIn:   1500,
        PacketsOut:  800,
    }

    insertQuery := `
    INSERT INTO network_traffic (time, interface, bytes_in, bytes_out, packets_in, packets_out)
    VALUES ($1, $2, $3, $4, $5, $6)
    `
    _, err = db.Exec(insertQuery, metric.Timestamp, metric.Interface,
        metric.BytesIn, metric.BytesOut, metric.PacketsIn, metric.PacketsOut)

    // Query with time-bucket aggregation
    aggregateQuery := `
    SELECT
        time_bucket('5 minutes', time) AS bucket,
        interface,
        AVG(bytes_in) as avg_bytes_in,
        MAX(bytes_in) as max_bytes_in
    FROM network_traffic
    WHERE time > NOW() - INTERVAL '1 hour'
    GROUP BY bucket, interface
    ORDER BY bucket DESC;
    `

    rows, err := db.Query(aggregateQuery)
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()

    for rows.Next() {
        var bucket time.Time
        var iface string
        var avgBytesIn, maxBytesIn float64

        err := rows.Scan(&bucket, &iface, &avgBytesIn, &maxBytesIn)
        if err != nil {
            log.Fatal(err)
        }

        fmt.Printf("%s | %s | Avg: %.2f | Max: %.2f\n",
            bucket.Format(time.RFC3339), iface, avgBytesIn, maxBytesIn)
    }
}

5. Forecasting with Prophet

Python

Predict future values using Facebook Prophet.

import pandas as pd
from prophet import Prophet
from influxdb_client import InfluxDBClient

client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="wia")

# Fetch historical data
query = '''
from(bucket: "sensors")
  |> range(start: -90d)
  |> filter(fn: (r) => r._measurement == "energy_consumption")
  |> filter(fn: (r) => r._field == "kwh")
  |> aggregateWindow(every: 1h, fn: mean)
'''

query_api = client.query_api()
df = query_api.query_data_frame(query)

# Prepare data for Prophet
prophet_df = df.rename(columns={'_time': 'ds', '_value': 'y'})
prophet_df = prophet_df[['ds', 'y']]

# Train model
model = Prophet(
    yearly_seasonality=True,
    weekly_seasonality=True,
    daily_seasonality=True
)
model.fit(prophet_df)

# Make predictions for next 7 days
future = model.make_future_dataframe(periods=7*24, freq='H')
forecast = model.predict(future)

# Visualize
import matplotlib.pyplot as plt

fig = model.plot(forecast)
plt.title('Energy Consumption Forecast')
plt.xlabel('Date')
plt.ylabel('kWh')
plt.savefig('forecast.png')

# Write forecasts back to InfluxDB
write_api = client.write_api()
for _, row in forecast.iterrows():
    point = Point("energy_forecast") \
        .field("predicted", row['yhat']) \
        .field("lower_bound", row['yhat_lower']) \
        .field("upper_bound", row['yhat_upper']) \
        .time(row['ds'])
    write_api.write(bucket="forecasts", record=point)

6. Anomaly Detection with Machine Learning

Python

Detect anomalies using Isolation Forest algorithm.

import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from influxdb_client import InfluxDBClient, Point

client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="wia")

# Fetch sensor data
query = '''
from(bucket: "iot")
  |> range(start: -7d)
  |> filter(fn: (r) => r._measurement == "vibration")
  |> filter(fn: (r) => r._field == "amplitude")
'''

query_api = client.query_api()
df = query_api.query_data_frame(query)

# Prepare features
X = df[['_value']].values

# Train Isolation Forest
clf = IsolationForest(contamination=0.05, random_state=42)
predictions = clf.fit_predict(X)

# Add predictions to dataframe
df['anomaly'] = predictions
df['is_anomaly'] = df['anomaly'] == -1

# Write anomalies back to database
write_api = client.write_api()
anomalies = df[df['is_anomaly']]

for _, row in anomalies.iterrows():
    point = Point("vibration_anomaly") \
        .tag("sensor_id", row.get('sensor_id', 'unknown')) \
        .field("value", row['_value']) \
        .field("severity", "high") \
        .time(row['_time'])
    write_api.write(bucket="alerts", record=point)

print(f"Detected {len(anomalies)} anomalies out of {len(df)} data points")

# Visualize
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))
plt.scatter(df.index, df['_value'], c=df['anomaly'], cmap='RdYlGn', alpha=0.6)
plt.xlabel('Index')
plt.ylabel('Vibration Amplitude')
plt.title('Anomaly Detection - Isolation Forest')
plt.colorbar(label='Normal (1) / Anomaly (-1)')
plt.savefig('anomalies.png')

7. Downsampling and Retention

InfluxQL

Configure continuous queries for automatic downsampling.

-- Create retention policies
CREATE RETENTION POLICY "raw_data" ON "sensors" DURATION 7d REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "downsampled_5m" ON "sensors" DURATION 90d REPLICATION 1
CREATE RETENTION POLICY "downsampled_1h" ON "sensors" DURATION 365d REPLICATION 1

-- Continuous query: 5-minute averages
CREATE CONTINUOUS QUERY "cq_5m_avg" ON "sensors"
BEGIN
  SELECT mean("value") AS "value"
  INTO "downsampled_5m"."temperature"
  FROM "raw_data"."temperature"
  GROUP BY time(5m), *
END

-- Continuous query: 1-hour averages
CREATE CONTINUOUS QUERY "cq_1h_avg" ON "sensors"
BEGIN
  SELECT mean("value") AS "value"
  INTO "downsampled_1h"."temperature"
  FROM "downsampled_5m"."temperature"
  GROUP BY time(1h), *
END

-- Query different resolutions
SELECT mean("value") FROM "raw_data"."temperature" WHERE time > now() - 1h
SELECT mean("value") FROM "downsampled_5m"."temperature" WHERE time > now() - 7d
SELECT mean("value") FROM "downsampled_1h"."temperature" WHERE time > now() - 30d