Practical code examples for working with the WIA-DATA-014 standard across different platforms and languages.
Collecting and storing temperature data from IoT sensors.
from influxdb_client import InfluxDBClient, Point
from datetime import datetime
# Initialize client
client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="wia")
write_api = client.write_api()
# Write temperature data
point = Point("temperature") \
.tag("location", "warehouse-a") \
.tag("sensor_id", "DHT22-001") \
.field("value", 23.5) \
.field("humidity", 65.2) \
.time(datetime.utcnow())
write_api.write(bucket="sensors", record=point)
# Query recent data
query = '''
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.location == "warehouse-a")
|> aggregateWindow(every: 5m, fn: mean)
'''
query_api = client.query_api()
result = query_api.query(query)
for table in result:
for record in table.records:
print(f"{record.get_time()}: {record.get_value()}°C")
Monitor server CPU usage and detect anomalies.
const { InfluxDB, Point } = require('@influxdata/influxdb-client');
const os = require('os');
const client = new InfluxDB({
url: 'http://localhost:8086',
token: 'your-token'
});
const writeApi = client.getWriteApi('wia', 'monitoring');
// Collect CPU metrics every 10 seconds
setInterval(() => {
const cpuUsage = os.loadavg()[0] * 10; // Simplified calculation
const point = new Point('cpu')
.tag('host', os.hostname())
.tag('platform', os.platform())
.floatField('usage_percent', cpuUsage)
.floatField('cores', os.cpus().length);
writeApi.writePoint(point);
// Flush every 5 points
writeApi.flush();
}, 10000);
// Query and detect high CPU usage
async function detectHighCPU() {
const queryApi = client.getQueryApi('wia');
const query = `
from(bucket: "monitoring")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_percent")
|> filter(fn: (r) => r._value > 80.0)
`;
const results = await queryApi.collectRows(query);
if (results.length > 0) {
console.log('⚠️ High CPU detected:', results);
// Trigger alert
}
}
setInterval(detectHighCPU, 60000); // Check every minute
Store and analyze financial time-series data.
import pandas as pd
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta
client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="wia")
# Write stock prices
def write_stock_data(symbol, price, volume):
write_api = client.write_api()
point = Point("stock_price") \
.tag("symbol", symbol) \
.tag("exchange", "NYSE") \
.field("price", price) \
.field("volume", volume) \
.time(datetime.utcnow())
write_api.write(bucket="finance", record=point)
# Calculate moving average
def get_moving_average(symbol, days=7):
query = f'''
from(bucket: "finance")
|> range(start: -{days}d)
|> filter(fn: (r) => r._measurement == "stock_price")
|> filter(fn: (r) => r.symbol == "{symbol}")
|> filter(fn: (r) => r._field == "price")
|> movingAverage(n: 10)
'''
query_api = client.query_api()
df = query_api.query_data_frame(query)
return df
# Detect price anomalies using z-score
def detect_price_anomalies(symbol, threshold=3.0):
query = f'''
import "contrib/anaisdg/anomalydetection"
from(bucket: "finance")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "stock_price")
|> filter(fn: (r) => r.symbol == "{symbol}")
|> anomalydetection.mad(threshold: {threshold})
'''
query_api = client.query_api()
result = query_api.query(query)
anomalies = []
for table in result:
for record in table.records:
if record.get_value() > 0: # Anomaly detected
anomalies.append({
'time': record.get_time(),
'value': record.get_field('price')
})
return anomalies
Monitor network traffic with TimescaleDB.
package main
import (
"database/sql"
"fmt"
"log"
"time"
_ "github.com/lib/pq"
)
type NetworkMetric struct {
Timestamp time.Time
Interface string
BytesIn int64
BytesOut int64
PacketsIn int64
PacketsOut int64
}
func main() {
connStr := "postgresql://user:pass@localhost/network_metrics?sslmode=disable"
db, err := sql.Open("postgres", connStr)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Create hypertable
createTable := `
CREATE TABLE IF NOT EXISTS network_traffic (
time TIMESTAMPTZ NOT NULL,
interface TEXT NOT NULL,
bytes_in BIGINT,
bytes_out BIGINT,
packets_in BIGINT,
packets_out BIGINT
);
SELECT create_hypertable('network_traffic', 'time', if_not_exists => TRUE);
`
_, err = db.Exec(createTable)
if err != nil {
log.Fatal(err)
}
// Insert metrics
metric := NetworkMetric{
Timestamp: time.Now(),
Interface: "eth0",
BytesIn: 1024000,
BytesOut: 512000,
PacketsIn: 1500,
PacketsOut: 800,
}
insertQuery := `
INSERT INTO network_traffic (time, interface, bytes_in, bytes_out, packets_in, packets_out)
VALUES ($1, $2, $3, $4, $5, $6)
`
_, err = db.Exec(insertQuery, metric.Timestamp, metric.Interface,
metric.BytesIn, metric.BytesOut, metric.PacketsIn, metric.PacketsOut)
// Query with time-bucket aggregation
aggregateQuery := `
SELECT
time_bucket('5 minutes', time) AS bucket,
interface,
AVG(bytes_in) as avg_bytes_in,
MAX(bytes_in) as max_bytes_in
FROM network_traffic
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY bucket, interface
ORDER BY bucket DESC;
`
rows, err := db.Query(aggregateQuery)
if err != nil {
log.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var bucket time.Time
var iface string
var avgBytesIn, maxBytesIn float64
err := rows.Scan(&bucket, &iface, &avgBytesIn, &maxBytesIn)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s | %s | Avg: %.2f | Max: %.2f\n",
bucket.Format(time.RFC3339), iface, avgBytesIn, maxBytesIn)
}
}
Predict future values using Facebook Prophet.
import pandas as pd
from prophet import Prophet
from influxdb_client import InfluxDBClient
client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="wia")
# Fetch historical data
query = '''
from(bucket: "sensors")
|> range(start: -90d)
|> filter(fn: (r) => r._measurement == "energy_consumption")
|> filter(fn: (r) => r._field == "kwh")
|> aggregateWindow(every: 1h, fn: mean)
'''
query_api = client.query_api()
df = query_api.query_data_frame(query)
# Prepare data for Prophet
prophet_df = df.rename(columns={'_time': 'ds', '_value': 'y'})
prophet_df = prophet_df[['ds', 'y']]
# Train model
model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=True
)
model.fit(prophet_df)
# Make predictions for next 7 days
future = model.make_future_dataframe(periods=7*24, freq='H')
forecast = model.predict(future)
# Visualize
import matplotlib.pyplot as plt
fig = model.plot(forecast)
plt.title('Energy Consumption Forecast')
plt.xlabel('Date')
plt.ylabel('kWh')
plt.savefig('forecast.png')
# Write forecasts back to InfluxDB
write_api = client.write_api()
for _, row in forecast.iterrows():
point = Point("energy_forecast") \
.field("predicted", row['yhat']) \
.field("lower_bound", row['yhat_lower']) \
.field("upper_bound", row['yhat_upper']) \
.time(row['ds'])
write_api.write(bucket="forecasts", record=point)
Detect anomalies using Isolation Forest algorithm.
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from influxdb_client import InfluxDBClient, Point
client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="wia")
# Fetch sensor data
query = '''
from(bucket: "iot")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "vibration")
|> filter(fn: (r) => r._field == "amplitude")
'''
query_api = client.query_api()
df = query_api.query_data_frame(query)
# Prepare features
X = df[['_value']].values
# Train Isolation Forest
clf = IsolationForest(contamination=0.05, random_state=42)
predictions = clf.fit_predict(X)
# Add predictions to dataframe
df['anomaly'] = predictions
df['is_anomaly'] = df['anomaly'] == -1
# Write anomalies back to database
write_api = client.write_api()
anomalies = df[df['is_anomaly']]
for _, row in anomalies.iterrows():
point = Point("vibration_anomaly") \
.tag("sensor_id", row.get('sensor_id', 'unknown')) \
.field("value", row['_value']) \
.field("severity", "high") \
.time(row['_time'])
write_api.write(bucket="alerts", record=point)
print(f"Detected {len(anomalies)} anomalies out of {len(df)} data points")
# Visualize
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 6))
plt.scatter(df.index, df['_value'], c=df['anomaly'], cmap='RdYlGn', alpha=0.6)
plt.xlabel('Index')
plt.ylabel('Vibration Amplitude')
plt.title('Anomaly Detection - Isolation Forest')
plt.colorbar(label='Normal (1) / Anomaly (-1)')
plt.savefig('anomalies.png')
Configure continuous queries for automatic downsampling.
-- Create retention policies
CREATE RETENTION POLICY "raw_data" ON "sensors" DURATION 7d REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "downsampled_5m" ON "sensors" DURATION 90d REPLICATION 1
CREATE RETENTION POLICY "downsampled_1h" ON "sensors" DURATION 365d REPLICATION 1
-- Continuous query: 5-minute averages
CREATE CONTINUOUS QUERY "cq_5m_avg" ON "sensors"
BEGIN
SELECT mean("value") AS "value"
INTO "downsampled_5m"."temperature"
FROM "raw_data"."temperature"
GROUP BY time(5m), *
END
-- Continuous query: 1-hour averages
CREATE CONTINUOUS QUERY "cq_1h_avg" ON "sensors"
BEGIN
SELECT mean("value") AS "value"
INTO "downsampled_1h"."temperature"
FROM "downsampled_5m"."temperature"
GROUP BY time(1h), *
END
-- Query different resolutions
SELECT mean("value") FROM "raw_data"."temperature" WHERE time > now() - 1h
SELECT mean("value") FROM "downsampled_5m"."temperature" WHERE time > now() - 7d
SELECT mean("value") FROM "downsampled_1h"."temperature" WHERE time > now() - 30d