from pywps import Process, LiteralInput, LiteralOutput, UOM, ComplexInput, ComplexOutput # noqa
from pywps.app.Common import Metadata
from pywps import FORMATS, Format
from pathlib import Path
import logging
import pandas as pd
from hawk.analysis import CausalAnalysis
import os
LOGGER = logging.getLogger("PYWPS")
FORMAT_PNG = Format("image/png", extension=".png", encoding="base64")
FORMAT_PDF = Format("application/pdf", extension=".pdf", encoding="utf-8")
FORMAT_PICKLE = Format("application/octet-stream", extension=".pkl")
[docs]
class Causal(Process):
def __init__(self):
inputs = [
ComplexInput(
"dataset_train",
"Train Dataset",
abstract="Please add the train csv file here.",
default="https://raw.githubusercontent.com/climateintelligence/hawk/main/hawk/demo/Ticino_train.csv",
min_occurs=1,
max_occurs=1,
supported_formats=[FORMATS.CSV],
),
ComplexInput(
"dataset_test",
"Test Dataset",
abstract="Please add the test csv file here.",
default="https://raw.githubusercontent.com/climateintelligence/hawk/main/hawk/demo/Ticino_test.csv",
min_occurs=1,
max_occurs=1,
supported_formats=[FORMATS.CSV],
),
LiteralInput(
"target_column_name",
"Target Column Name",
data_type="string",
abstract="Please enter the case-specific name of the target variable in the dataframe.",
default="target",
),
LiteralInput(
"pcmci_test_choice",
"PCMCI Test Choice",
data_type="string",
abstract="Choose the independence test to be used in PCMCI.",
default="ParCorr",
allowed_values=[
"ParCorr",
"CMIknn",
],
),
LiteralInput(
"pcmci_max_lag",
"PCMCI Max Lag",
data_type="string",
abstract="Choose the maximum lag to test used in PCMCI.",
default="1",
allowed_values=[
"0",
"1",
"2",
"3",
"4",
"5",
],
),
LiteralInput(
"tefs_direction",
"TEFS Direction",
data_type="string",
abstract="Choose the direction of the TEFS algorithm.",
default="both",
allowed_values=[
"forward",
"backward",
"both",
],
),
LiteralInput(
"tefs_use_contemporary_features",
"TEFS Use Contemporary Features",
data_type="boolean",
abstract="Choose whether to use contemporary features in the TEFS algorithm.",
default=True,
),
LiteralInput(
"tefs_max_lag_features",
"TEFS Max Lag Features",
data_type="string",
abstract="Choose the maximum lag of the features in the TEFS algorithm.",
default="1",
allowed_values=[
"no_lag",
"1",
"2",
"3",
"4",
"5",
],
),
LiteralInput(
"tefs_max_lag_target",
"TEFS Max Lag Target",
data_type="string",
abstract="Choose the maximum lag of the target in the TEFS algorithm.",
default="1",
allowed_values=[
"1",
"2",
"3",
"4",
"5",
],
),
]
outputs = [
ComplexOutput(
"pkl_baseline",
"Baseline Scores",
abstract="The baseline scores on the initial data.",
as_reference=True,
supported_formats=[FORMAT_PICKLE],
),
ComplexOutput(
"plot_pcmci",
"Selected features by PCMCI",
abstract="The selected features by PCMCI.",
as_reference=True,
supported_formats=[FORMAT_PDF],
),
ComplexOutput(
"plot_pcmci_preview",
"Selected features by PCMCI",
abstract="The selected features by PCMCI.",
as_reference=True,
supported_formats=[FORMAT_PNG],
),
ComplexOutput(
"pkl_pcmci",
"PCMCI Results Details",
abstract="The PCMCI results details.",
as_reference=True,
supported_formats=[FORMAT_PICKLE],
),
ComplexOutput(
"plot_tefs",
"Selected features by TEFS",
abstract="The selected features by TEFS.",
as_reference=True,
supported_formats=[FORMAT_PDF],
),
ComplexOutput(
"plot_tefs_preview",
"Selected features by TEFS",
abstract="The selected features by TEFS.",
as_reference=True,
supported_formats=[FORMAT_PNG],
),
ComplexOutput(
"pkl_tefs",
"TEFS Results",
abstract="The TEFS results.",
as_reference=True,
supported_formats=[FORMAT_PICKLE],
),
ComplexOutput(
"plot_tefs_wrapper",
"Wrapper scores by TEFS",
abstract="The wrapper scores evolution by TEFS.",
as_reference=True,
supported_formats=[FORMAT_PDF],
),
ComplexOutput(
"plot_tefs_wrapper_preview",
"Wrapper scores by TEFS",
abstract="The wrapper scores evolution by TEFS.",
as_reference=True,
supported_formats=[FORMAT_PNG],
),
ComplexOutput(
"pkl_tefs_wrapper",
"TEFS Wrapper Scores Evolution details",
abstract="The TEFS wrapper scores evolution details.",
as_reference=True,
supported_formats=[FORMAT_PICKLE],
),
]
super(Causal, self).__init__(
self._handler,
identifier="causal",
title="Causal Analysis",
abstract="Performs a causal analysis with multiple configurations, returning outputs plots and pkl file.",
keywords=["causal", "analysis"],
metadata=[
Metadata("PyWPS", "https://pywps.org/"),
Metadata("Birdhouse", "http://bird-house.github.io/"),
Metadata("PyWPS Demo", "https://pywps-demo.readthedocs.io/en/latest/"),
Metadata("Emu: PyWPS examples", "https://emu.readthedocs.io/en/latest/"),
],
version="1.5",
inputs=inputs,
outputs=outputs,
store_supported=True,
status_supported=True,
)
def _handler(self, request, response):
response.update_status("Processing started", 0)
# Read the inputs
target_column_name = request.inputs["target_column_name"][0].data
df_train = pd.read_csv(request.inputs["dataset_train"][0].file, header=0)
df_test = pd.read_csv(request.inputs["dataset_test"][0].file, header=0)
LOGGER.info(f"Train shape: {df_train.shape}")
LOGGER.info(f"Test shape: {df_test.shape}")
LOGGER.info(f"Train columns: {df_train.columns}")
LOGGER.info(f"Test columns: {df_test.columns}")
pcmci_test_choice = request.inputs["pcmci_test_choice"][0].data
pcmci_max_lag = int(request.inputs["pcmci_max_lag"][0].data)
tefs_direction = request.inputs["tefs_direction"][0].data
tefs_use_contemporary_features = request.inputs["tefs_use_contemporary_features"][0].data
if str(request.inputs["tefs_max_lag_features"][0].data) == "no_lag":
tefs_max_lag_features = 0
else:
tefs_max_lag_features = int(request.inputs["tefs_max_lag_features"][0].data)
tefs_max_lag_target = int(request.inputs["tefs_max_lag_target"][0].data)
workdir = Path(self.workdir)
os.environ['MPLCONFIGDIR'] = os.path.join(workdir, "/matplotlib")
if not tefs_use_contemporary_features and tefs_max_lag_features == 0:
raise ValueError("You cannot use no lag features and not use contemporary features in TEFS.")
causal_analysis = CausalAnalysis(
df_train,
df_test,
target_column_name,
pcmci_test_choice,
pcmci_max_lag,
tefs_direction,
tefs_use_contemporary_features,
tefs_max_lag_features,
tefs_max_lag_target,
workdir,
response,
)
causal_analysis.run()
response.outputs["pkl_baseline"].file = causal_analysis.baseline
response.outputs["plot_pcmci"].file = causal_analysis.plot_pcmci["pdf"]
response.outputs["pkl_pcmci"].file = causal_analysis.details_pcmci
response.outputs["plot_tefs"].file = causal_analysis.plot_tefs["pdf"]
response.outputs["pkl_tefs"].file = causal_analysis.details_tefs
response.outputs["plot_tefs_wrapper"].file = causal_analysis.plot_tefs_wrapper["pdf"]
response.outputs["pkl_tefs_wrapper"].file = causal_analysis.details_tefs_wrapper
# Previews for the plots in png format
response.outputs["plot_pcmci_preview"].file = causal_analysis.plot_pcmci["png"]
response.outputs["plot_tefs_preview"].file = causal_analysis.plot_tefs["png"]
response.outputs["plot_tefs_wrapper_preview"].file = causal_analysis.plot_tefs_wrapper["png"]
response.update_status("Processing completed", 100)
return response