diff --git a/sigllm/benchmark.py b/sigllm/benchmark.py new file mode 100644 index 0000000..e09e58a --- /dev/null +++ b/sigllm/benchmark.py @@ -0,0 +1,452 @@ +# -*- coding: utf-8 -*- + +import argparse +import ast +import concurrent +import json +import logging +import os +import uuid +import warnings +from copy import deepcopy +from datetime import datetime +from functools import partial +from glob import glob +from pathlib import Path + +import numpy as np +import pandas as pd +import tqdm +from mlblocks import get_pipelines_paths +from orion.benchmark import _load_signal, _parse_confusion_matrix, _sort_leaderboard +from orion.data import load_anomalies +from orion.evaluation import CONTEXTUAL_METRICS as METRICS +from orion.evaluation import contextual_confusion_matrix +from orion.progress import TqdmLogger, progress + +from sigllm import SigLLM +from sigllm.data import load_normal + +warnings.simplefilter('ignore') + +LOGGER = logging.getLogger(__name__) + +BUCKET = 'sintel-sigllm' +S3_URL = 'https://{}.s3.amazonaws.com/%7B%7D' + +BENCHMARK_PATH = os.path.join( + os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'), 'benchmark' +) + +BENCHMARK_DATA = ( + pd.read_csv(S3_URL.format(BUCKET, 'datasets.csv'), index_col=0, header=None) + .applymap(ast.literal_eval) + .to_dict()[1] +) +BENCHMARK_PARAMS = ( + pd.read_csv(S3_URL.format(BUCKET, 'parameters.csv'), index_col=0, header=None) + .applymap(ast.literal_eval) + .to_dict()[1] +) + +PIPELINE_DIR = os.path.join(os.path.dirname(__file__), 'pipelines') + +PIPELINES = { + 'mistral_prompter_restricted': 'mistral_prompter', + 'mistral_prompter_0shot': 'mistral_prompter_0shot', + 'mistral_prompter_1shot': 'mistral_prompter_1shot', +} # ['mistral_prompter_0shot', 'mistral_prompter_1shot'] + + +def _get_pipeline_directory(pipeline_name): + if os.path.isfile(pipeline_name): + return os.path.dirname(pipeline_name) + + pipelines_paths = get_pipelines_paths() + for base_path in pipelines_paths: + parts = pipeline_name.split('.') + number_of_parts = len(parts) + + for folder_parts in range(number_of_parts): + folder = os.path.join(base_path, *parts[:folder_parts]) + filename = '.'.join(parts[folder_parts:]) + '.json' + json_path = os.path.join(folder, filename) + + if os.path.isfile(json_path): + return os.path.dirname(json_path) + + +def _get_pipeline_hyperparameter(hyperparameters, dataset_name, pipeline_name): + hyperparameters_ = deepcopy(hyperparameters) + + if hyperparameters: + hyperparameters_ = hyperparameters_.get(dataset_name) or hyperparameters_ + hyperparameters_ = hyperparameters_.get(pipeline_name) or hyperparameters_ + + if hyperparameters_ is None and dataset_name and pipeline_name: + pipeline_path = _get_pipeline_directory(pipeline_name) + pipeline_dirname = os.path.basename(pipeline_path) + file_path = os.path.join( + pipeline_path, pipeline_dirname + '_' + dataset_name.lower() + '.json' + ) + if os.path.exists(file_path): + hyperparameters_ = file_path + + if isinstance(hyperparameters_, str) and os.path.exists(hyperparameters_): + with open(hyperparameters_) as f: + hyperparameters_ = json.load(f) + + return hyperparameters_ + + +def _augment_hyperparameters(hyperparameters, few_shot): + hyperparameters_ = deepcopy(hyperparameters) + if few_shot: + for hyperparameter, value in hyperparameters.items(): + if 'time_segments_aggregate' in hyperparameter: + name = hyperparameter[:-1] + hyperparameters_[name + '2'] = value + + return hyperparameters_ + + +def _evaluate_signal( + pipeline, signal, hyperparameter, metrics, test_split=False, few_shot=False, anomaly_path=None +): + _, test = _load_signal(signal, test_split) + truth = load_anomalies(signal) + + normal = None + if few_shot: + normal = load_normal(signal) + + try: + LOGGER.info( + 'Scoring pipeline %s on signal %s (test split: %s)', pipeline, signal, test_split + ) + + start = datetime.utcnow() + pipeline = SigLLM(pipeline, hyperparameters=hyperparameter) + anomalies = pipeline.detect(test, normal=normal) + elapsed = datetime.utcnow() - start + + scores = {name: scorer(truth, anomalies, test) for name, scorer in metrics.items()} + + status = 'OK' + + except Exception as ex: + LOGGER.exception( + 'Exception scoring pipeline %s on signal %s (test split: %s), error %s.', + pipeline, + signal, + test_split, + ex, + ) + + elapsed = datetime.utcnow() - start + anomalies = pd.DataFrame([], columns=['start', 'end', 'score']) + scores = {name: 0 for name in metrics.keys()} + + status = 'ERROR' + + if 'confusion_matrix' in metrics.keys(): + _parse_confusion_matrix(scores, truth) + + scores['status'] = status + scores['elapsed'] = elapsed.total_seconds() + scores['split'] = test_split + + if anomaly_path: + anomalies.to_csv(anomaly_path, index=False) + + return scores + + +def _run_job(args): + # Reset random seed + np.random.seed() + + ( + pipeline, + pipeline_name, + dataset, + signal, + hyperparameter, + metrics, + test_split, + few_shot, + iteration, + cache_dir, + anomaly_dir, + run_id, + ) = args + + anomaly_path = anomaly_dir + if anomaly_dir: + base_path = str(anomaly_dir / f'{pipeline_name}_{signal}_{dataset}_{iteration}') + anomaly_path = base_path + '_anomalies.csv' + + LOGGER.info( + 'Evaluating pipeline %s on signal %s dataset %s (test split: %s); iteration %s', + pipeline_name, + signal, + dataset, + test_split, + iteration, + ) + + output = _evaluate_signal( + pipeline, signal, hyperparameter, metrics, test_split, few_shot, anomaly_path + ) + scores = pd.DataFrame.from_records([output], columns=output.keys()) + + scores.insert(0, 'dataset', dataset) + scores.insert(1, 'pipeline', pipeline_name) + scores.insert(2, 'signal', signal) + scores.insert(3, 'iteration', iteration) + scores['run_id'] = run_id + + if cache_dir: + base_path = str(cache_dir / f'{pipeline_name}_{signal}_{dataset}_{iteration}_{run_id}') + scores.to_csv(base_path + '_scores.csv', index=False) + + return scores + + +def _run_on_dask(jobs, verbose): + """Run the tasks in parallel using dask.""" + try: + import dask + except ImportError as ie: + ie.msg += ( + '\n\nIt seems like `dask` is not installed.\n' + 'Please install `dask` and `distributed` using:\n' + '\n pip install dask distributed' + ) + raise + + scorer = dask.delayed(_run_job) + persisted = dask.persist(*[scorer(args) for args in jobs]) + if verbose: + try: + progress(persisted) + except ValueError: + pass + + return dask.compute(*persisted) + + +def benchmark( + pipelines=None, + datasets=None, + hyperparameters=None, + metrics=METRICS, + rank='f1', + test_split=False, + iterations=1, + workers=1, + show_progress=False, + cache_dir=None, + anomaly_dir=None, + resume=False, + output_path=None, +): + """Run pipelines on the given datasets and evaluate the performance. + + The pipelines are used to analyze the given signals and later on the + detected anomalies are scored against the known anomalies using the + indicated metrics. + + Finally, the scores obtained with each metric are averaged accross all the signals, + ranked by the indicated metric and returned on a ``pandas.DataFrame``. + + Args: + pipelines (dict or list): dictionary with pipeline names as keys and their + JSON paths as values. If a list is given, it should be of JSON paths, + and the paths themselves will be used as names. If not give, all verified + pipelines will be used for evaluation. + datasets (dict or list): dictionary of dataset name as keys and list of signals as + values. If a list is given then it will be under a generic name ``dataset``. + If not given, all benchmark datasets will be used used. + hyperparameters (dict or list): dictionary with pipeline names as keys + and their hyperparameter JSON paths or dictionaries as values. If a list is + given, it should be of corresponding order to pipelines. + metrics (dict or list): dictionary with metric names as keys and + scoring functions as values. If a list is given, it should be of scoring + functions, and they ``__name__`` value will be used as the metric name. + If not given, all the available metrics will be used. + rank (str): Sort and rank the pipelines based on the given metric. + If not given, rank using the first metric. + test_split (bool or float): Whether to use the prespecified train-test split. If + float, then it should be between 0.0 and 1.0 and represent the proportion of + the signal to include in the test split. If not given, use ``False``. + iterations (int): + Number of iterations to perform over each signal and pipeline. Defaults to 1. + workers (int or str): + If ``workers`` is given as an integer value other than 0 or 1, a multiprocessing + Pool is used to distribute the computation across the indicated number of workers. + If the string ``dask`` is given, the computation is distributed using ``dask``. + In this case, setting up the ``dask`` cluster and client is expected to be handled + outside of this function. + show_progress (bool): + Whether to use tqdm to keep track of the progress. Defaults to ``True``. + cache_dir (str): + If a ``cache_dir`` is given, intermediate results are stored in the indicated directory + as CSV files as they get computted. This allows inspecting results while the benchmark + is still running and also recovering results in case the process does not finish + properly. Defaults to ``None``. + anomaly_dir (str): + If a ``anomaly_dir`` is given, detected anomalies will get dumped in the specificed + directory as csv files. Defaults to ``None``. + resume (bool): + Whether to continue running the experiments in the benchmark from the current + progress in ``cache_dir``. + output_path (str): Location to save the intermediatry results. If not given, + intermediatry results will not be saved. + + Returns: + pandas.DataFrame: + A table containing the scores obtained with each scoring function accross + all the signals for each pipeline. + """ + pipelines = pipelines or PIPELINES + datasets = datasets or BENCHMARK_DATA + run_id = os.getenv('RUN_ID') or str(uuid.uuid4())[:10] + + if isinstance(pipelines, list): + pipelines = {pipeline: pipeline for pipeline in pipelines} + + if isinstance(datasets, list): + datasets = {'dataset': datasets} + + if isinstance(hyperparameters, list): + hyperparameters = { + pipeline: hyperparameter + for pipeline, hyperparameter in zip(pipelines.keys(), hyperparameters) + } + + if isinstance(metrics, list): + metrics_ = dict() + for metric in metrics: + if callable(metric): + metrics_[metric.__name__] = metric + elif metric in METRICS: + metrics_[metric] = METRICS[metric] + else: + raise ValueError('Unknown metric: {}'.format(metric)) + + metrics = metrics_ + + if cache_dir: + cache_dir = Path(cache_dir) + os.makedirs(cache_dir, exist_ok=True) + + if anomaly_dir: + anomaly_dir = Path(anomaly_dir) + os.makedirs(anomaly_dir, exist_ok=True) + + jobs = list() + for dataset, signals in datasets.items(): + for pipeline_name, pipeline in pipelines.items(): + hyperparameter = _get_pipeline_hyperparameter(hyperparameters, dataset, pipeline_name) + parameters = BENCHMARK_PARAMS.get(dataset) + print(hyperparameter) + + few_shot = True if '1shot' in pipeline_name.lower() else False + hyperparameter = _augment_hyperparameters(hyperparameter, few_shot) + + if parameters is not None: + test_split, = parameters.values() + for signal in signals: + for iteration in range(iterations): + if resume: + experiment = str( + cache_dir / f'{pipeline_name}_{signal}_{dataset}_{iteration}' + ) + if len(glob(experiment + '*.csv')) > 0: + LOGGER.warning(f'skipping {experiment}') + continue + + args = ( + pipeline, + pipeline_name, + dataset, + signal, + hyperparameter, + metrics, + test_split, + few_shot, + iteration, + cache_dir, + anomaly_dir, + run_id, + ) + jobs.append(args) + + if workers == 'dask': + scores = _run_on_dask(jobs, show_progress) + else: + if workers in (0, 1): + scores = map(_run_job, jobs) + else: + pool = concurrent.futures.ProcessPoolExecutor(workers) + scores = pool.map(_run_job, jobs) + + scores = tqdm.tqdm(scores, total=len(jobs), file=TqdmLogger()) + if show_progress: + scores = tqdm.tqdm(scores, total=len(jobs)) + + if scores: + scores = pd.concat(scores) + if output_path: + LOGGER.info('Saving benchmark report to %s', output_path) + scores.to_csv(output_path, index=False) + + return _sort_leaderboard(scores, rank, metrics) + + LOGGER.info('No scores to be recorded.') + return pd.DataFrame() + + +def main(pipelines, datasets, resume, workers, output_path, cache_dir, anomaly_dir, **kwargs): + """Main to call benchmark function.""" + # output path + output_path = os.path.join(BENCHMARK_PATH, 'results', output_path) + + # metrics + del METRICS['accuracy'] + METRICS['confusion_matrix'] = contextual_confusion_matrix + metrics = {k: partial(fun, weighted=False) for k, fun in METRICS.items()} + + results = benchmark( + pipelines=pipelines, + datasets=datasets, + metrics=metrics, + output_path=output_path, + workers=workers, + resume=resume, + cache_dir=cache_dir, + anomaly_dir=anomaly_dir, + ) + + return results + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument('-p', '--pipelines', nargs='+', type=str, default=PIPELINES) + parser.add_argument('-d', '--datasets', nargs='+', type=str, default=BENCHMARK_DATA) + parser.add_argument('-r', '--resume', type=bool, default=False) + parser.add_argument('-w', '--workers', default=1) + + parser.add_argument('-o', '--output_path', type=str, default='results.csv') + parser.add_argument('-c', '--cache_dir', type=str, default='cache') + parser.add_argument('-ad', '--anomaly_dir', type=str, default='anomaly_dir') + + config = parser.parse_args() + + if any([dataset in BENCHMARK_DATA.keys() for dataset in config.datasets]): + config.datasets = dict((dataset, BENCHMARK_DATA[dataset]) for dataset in config.datasets) + + results = main(**vars(config)) diff --git a/sigllm/core.py b/sigllm/core.py index 3e55407..0008002 100644 --- a/sigllm/core.py +++ b/sigllm/core.py @@ -100,8 +100,14 @@ def __repr__(self): return ('SigLLM:\n{}\nhyperparameters:\n{}\n').format(pipeline, hyperparameters) - def detect(self, data: pd.DataFrame, visualization: bool = False, **kwargs) -> pd.DataFrame: - """Detect anomalies in the given data.. + def detect( + self, + data: pd.DataFrame, + normal: pd.DataFrame = None, + visualization: bool = False, + **kwargs, + ) -> pd.DataFrame: + """Detect anomalies in the given data. If ``visualization=True``, also return the visualization outputs from the MLPipeline object. @@ -110,6 +116,10 @@ def detect(self, data: pd.DataFrame, visualization: bool = False, **kwargs) -> p data (DataFrame): Input data, passed as a ``pandas.DataFrame`` containing exactly two columns: timestamp and value. + normal (DataFrame, optional): + Normal reference data for one-shot prompting, passed as a ``pandas.DataFrame`` + containing exactly two columns: timestamp and value. If None, zero-shot + prompting is used. Default to None. visualization (bool): If ``True``, also capture the ``visualization`` named output from the ``MLPipeline`` and return it as a second @@ -125,6 +135,9 @@ def detect(self, data: pd.DataFrame, visualization: bool = False, **kwargs) -> p if not self._fitted: self._mlpipeline = self._get_mlpipeline() + if normal is not None: + kwargs['normal'] = normal + result = self._detect(self._mlpipeline.fit, data, visualization, **kwargs) self._fitted = True diff --git a/sigllm/data.py b/sigllm/data.py new file mode 100644 index 0000000..dc2e161 --- /dev/null +++ b/sigllm/data.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- + +"""Data Management module. + +This module contains functions that allow downloading demo data from Amazon S3, +as well as load and work with other data stored locally. +""" + +import logging +import os + +import pandas as pd + +LOGGER = logging.getLogger(__name__) + +DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data') +BUCKET = 'sintel-sigllm' +S3_URL = 'https://{}.s3.amazonaws.com/%7B%7D' + + +def download_normal(name, test_size=None, data_path=DATA_PATH): + """Load the CSV with the given name from S3. + + If the CSV has never been loaded before, it will be downloaded + from the [d3-ai-orion bucket](https://d3-ai-orion.s3.amazonaws.com) or + the S3 bucket specified following the `s3://{bucket}/path/to/the.csv` format, + and then cached inside the `data` folder, within the `orion` package + directory, and then returned. + + Otherwise, if it has been downloaded and cached before, it will be directly + loaded from the `orion/data` folder without contacting S3. + + If a `test_size` value is given, the data will be split in two parts + without altering its order, making the second one proportionally as + big as the given value. + + Args: + name (str): Name of the CSV to load. + test_size (float): Value between 0 and 1 indicating the proportional + size of the test split. If 0 or None (default), the data is not split. + + Returns: + If no test_size is given, a single pandas.DataFrame is returned containing all + the data. If test_size is given, a tuple containing one pandas.DataFrame for + the train split and another one for the test split is returned. + + Raises: + FileNotFoundError: If the normal file doesn't exist locally and can't + be downloaded from S3. + """ + try: + url = None + if name.startswith('s3://'): + parts = name[5:].split('/', 1) + bucket = parts[0] + path = parts[1] + url = S3_URL.format(bucket, path) + filename = os.path.join(data_path, path.split('/')[-1]) + else: + filename = os.path.join(data_path, name + '_normal.csv') + data_path = os.path.join(data_path, os.path.dirname(name)) + + if os.path.exists(filename): + data = pd.read_csv(filename) + return data + + url = url or S3_URL.format(BUCKET, '{}_normal.csv'.format(name)) + LOGGER.info('Downloading CSV %s from %s', name, url) + + try: + data = pd.read_csv(url) + os.makedirs(data_path, exist_ok=True) + data.to_csv(filename, index=False) + return data + except Exception: + error_msg = ( + f'Could not download or find normal file for {name}. ' + f'Please ensure the file exists at {filename} or can be ' + f'downloaded from {url}' + ) + LOGGER.error(error_msg) + raise FileNotFoundError(error_msg) + + except Exception as e: + error_msg = f'Error processing normal file for {name}: {str(e)}' + LOGGER.error(error_msg) + raise FileNotFoundError(error_msg) + + +def format_csv(df, timestamp_column=None, value_columns=None): + """Format CSV data with timestamp and value columns. + + Args: + df (pd.DataFrame): Input DataFrame + timestamp_column: Column index or name for timestamp + value_columns: Column index or name for values + + Returns: + pd.DataFrame: Formatted DataFrame with timestamp and values + """ + timestamp_column_name = df.columns[timestamp_column] if timestamp_column else df.columns[0] + value_column_names = df.columns[value_columns] if value_columns else df.columns[1:] + + data = dict() + data['timestamp'] = df[timestamp_column_name].astype('int64').values + for column in value_column_names: + data[column] = df[column].astype(float).values + + return pd.DataFrame(data) + + +def load_csv(path, timestamp_column=None, value_column=None): + """Load and format CSV file. + + Args: + path (str): Path to CSV file + timestamp_column: Column index or name for timestamp + value_column: Column index or name for values + + Returns: + pd.DataFrame: Loaded and formatted DataFrame + + Raises: + ValueError: If column specifications are invalid + """ + header = None if timestamp_column is not None else 'infer' + data = pd.read_csv(path, header=header) + + if timestamp_column is None: + if value_column is not None: + raise ValueError('If value_column is provided, timestamp_column must be as well') + return data + + elif value_column is None: + raise ValueError('If timestamp_column is provided, value_column must be as well') + elif timestamp_column == value_column: + raise ValueError('timestamp_column cannot be the same as value_column') + + return format_csv(data, timestamp_column, value_column) + + +def load_normal( + name, timestamp_column=None, value_column=None, start=None, end=None, use_timestamps=False +): + """Load normal data from file or download if needed. + + Args: + name (str): + Name or path of the normal data. + timestamp_column (str or int): + Column index or name for timestamp. + value_column (str or int): + Column index or name for values. + start (int or timestamp): + Optional. If specified, this will be start of the sub-sequence. + end (int or timestamp): + Optional. If specified, this will be end of the sub-sequence. + use_timestamps (bool): + If True, start and end are interpreted as timestamps. + If False, start and end are interpreted as row indices. + + Returns: + pandas.DataFrame: + Loaded subsequence with `timestamp` and `value` columns. + """ + if os.path.isfile(name): + data = load_csv(name, timestamp_column, value_column) + else: + data = download_normal(name) + + data = format_csv(data) + + # Handle slicing if start or end is specified + if start is not None or end is not None: + if use_timestamps: + # If start and end are timestamps + mask = (data['timestamp'] >= start) & (data['timestamp'] <= end) + data = data[mask] + else: + # If start and end are indices + data = data.iloc[start:end] + + return data diff --git a/sigllm/pipelines/prompter/mistral_prompter.json b/sigllm/pipelines/prompter/mistral_prompter.json index 0bc3e10..a1a5bb7 100644 --- a/sigllm/pipelines/prompter/mistral_prompter.json +++ b/sigllm/pipelines/prompter/mistral_prompter.json @@ -31,7 +31,8 @@ }, "sigllm.primitives.prompting.huggingface.HF#1": { "name": "mistralai/Mistral-7B-Instruct-v0.2", - "samples": 10 + "samples": 10, + "restrict_tokens": true }, "sigllm.primitives.prompting.anomalies.find_anomalies_in_windows#1": { "alpha": 0.4 diff --git a/sigllm/pipelines/prompter/mistral_prompter_0shot.json b/sigllm/pipelines/prompter/mistral_prompter_0shot.json new file mode 100644 index 0000000..430b6ab --- /dev/null +++ b/sigllm/pipelines/prompter/mistral_prompter_0shot.json @@ -0,0 +1,74 @@ +{ + "primitives": [ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate", + "sklearn.impute.SimpleImputer", + "sigllm.primitives.transformation.Float2Scalar", + "sigllm.primitives.prompting.timeseries_preprocessing.rolling_window_sequences", + "sigllm.primitives.transformation.format_as_string", + + "sigllm.primitives.prompting.huggingface.HF", + "sigllm.primitives.transformation.parse_anomaly_response", + "sigllm.primitives.transformation.format_as_integer", + "sigllm.primitives.prompting.anomalies.val2idx", + "sigllm.primitives.prompting.anomalies.find_anomalies_in_windows", + "sigllm.primitives.prompting.anomalies.merge_anomalous_sequences", + "sigllm.primitives.prompting.anomalies.format_anomalies" + ], + "init_params": { + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 21600, + "method": "mean" + }, + "sigllm.primitives.transformation.Float2Scalar#1": { + "decimal": 2, + "rescale": true + }, + "sigllm.primitives.prompting.timeseries_preprocessing.rolling_window_sequences#1": { + "window_size": 200, + "step_size": 40 + }, + "sigllm.primitives.transformation.format_as_string#1": { + "space": false + }, + "sigllm.primitives.prompting.huggingface.HF#1": { + "name": "mistralai/Mistral-7B-Instruct-v0.2", + "samples": 3, + "temp": 0.01 + }, + "sigllm.primitives.prompting.anomalies.find_anomalies_in_windows#1": { + "alpha": 0.4 + }, + "sigllm.primitives.prompting.anomalies.merge_anomalous_sequences#1": { + "beta": 0.5 + } + }, + "input_names": { + "sigllm.primitives.prompting.huggingface.HF#1": { + "X": "X_str" + }, + "sigllm.primitives.transformation.parse_anomaly_response#1": { + "X": "y_hat" + }, + "sigllm.primitives.transformation.format_as_integer#1": { + "X": "y_parsed" + } + }, + "output_names": { + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "index": "timestamp" + }, + "sigllm.primitives.transformation.format_as_string#1": { + "X": "X_str" + }, + "sigllm.primitives.prompting.huggingface.HF#1": { + "y": "y_hat" + }, + "sigllm.primitives.transformation.parse_anomaly_response#1": { + "X": "y_parsed" + }, + "sigllm.primitives.transformation.format_as_integer#1": { + "X": "y" + } + } +} \ No newline at end of file diff --git a/sigllm/pipelines/prompter/mistral_prompter_1shot.json b/sigllm/pipelines/prompter/mistral_prompter_1shot.json new file mode 100644 index 0000000..ca36154 --- /dev/null +++ b/sigllm/pipelines/prompter/mistral_prompter_1shot.json @@ -0,0 +1,174 @@ +{ + "primitives": [ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate", + "sklearn.impute.SimpleImputer", + "sigllm.primitives.transformation.Float2Scalar", + "sigllm.primitives.prompting.timeseries_preprocessing.rolling_window_sequences", + "sigllm.primitives.transformation.format_as_string", + + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate", + "sklearn.impute.SimpleImputer", + "sigllm.primitives.transformation.Float2Scalar", + "sigllm.primitives.transformation.format_as_string", + + "sigllm.primitives.prompting.huggingface.HF", + "sigllm.primitives.transformation.parse_anomaly_response", + "sigllm.primitives.transformation.format_as_integer", + "sigllm.primitives.prompting.anomalies.val2idx", + "sigllm.primitives.prompting.anomalies.find_anomalies_in_windows", + "sigllm.primitives.prompting.anomalies.merge_anomalous_sequences", + "sigllm.primitives.prompting.anomalies.format_anomalies" + ], + "init_params": { + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 21600, + "method": "mean" + }, + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#2": { + "time_column": "normal_timestamp", + "interval": 21600, + "method": "mean" + }, + "sigllm.primitives.transformation.Float2Scalar#1": { + "decimal": 2, + "rescale": true + }, + "sigllm.primitives.transformation.Float2Scalar#2": { + "decimal": 2, + "rescale": true + }, + "sigllm.primitives.prompting.timeseries_preprocessing.rolling_window_sequences#1": { + "window_size": 200, + "step_size": 40 + }, + "sigllm.primitives.transformation.format_as_string#1": { + "space": false + }, + "sigllm.primitives.transformation.format_as_string#2": { + "space": false, + "normal": true + }, + "sigllm.primitives.prompting.huggingface.HF#1": { + "name": "mistralai/Mistral-7B-Instruct-v0.2", + "anomalous_percent": 0.5, + "samples": 1, + "temp": 0.01 + }, + "sigllm.primitives.prompting.anomalies.find_anomalies_in_windows#1": { + "alpha": 0.4 + }, + "sigllm.primitives.prompting.anomalies.merge_anomalous_sequences#1": { + "beta": 0.5 + } + }, + "input_names": { + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "X": "X", + "timestamp": "timestamp" + }, + "sklearn.impute.SimpleImputer#1": { + "X": "X_processed" + }, + "sigllm.primitives.transformation.Float2Scalar#1": { + "X": "X_imputed" + }, + "sigllm.primitives.prompting.timeseries_preprocessing.rolling_window_sequences#1": { + "X": "X_scalar" + }, + "sigllm.primitives.transformation.format_as_string#1": { + "X": "X_sequences" + }, + + + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#2": { + "X": "normal", + "timestamp": "normal_timestamp" + }, + "sklearn.impute.SimpleImputer#2": { + "X": "normal_processed" + }, + "sigllm.primitives.transformation.Float2Scalar#2": { + "X": "normal_imputed" + }, + "sigllm.primitives.transformation.format_as_string#2": { + "X": "normal_scalar" + }, + + + "sigllm.primitives.prompting.huggingface.HF#1": { + "X": "X_str", + "normal": "normal_str" + }, + "sigllm.primitives.transformation.parse_anomaly_response#1": { + "X": "y_hat" + }, + "sigllm.primitives.transformation.format_as_integer#1": { + "X": "y_parsed" + }, + "sigllm.primitives.prompting.anomalies.val2idx#1": { + "y": "y_intermediate", + "X": "X_sequences" + }, + "sigllm.primitives.prompting.anomalies.find_anomalies_in_windows#1": { + "y": "y_idx" + }, + "sigllm.primitives.prompting.anomalies.merge_anomalous_sequences#1": { + "y": "y_windows" + }, + "sigllm.primitives.prompting.anomalies.format_anomalies#1": { + "y": "y_merged" + } + }, + "output_names": { + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "X": "X_processed", + "index": "timestamp" + }, + "sklearn.impute.SimpleImputer#1": { + "X": "X_imputed" + }, + "sigllm.primitives.transformation.Float2Scalar#1": { + "X": "X_scalar" + }, + "sigllm.primitives.prompting.timeseries_preprocessing.rolling_window_sequences#1": { + "X": "X_sequences" + }, + "sigllm.primitives.transformation.format_as_string#1": { + "X": "X_str" + }, + + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#2": { + "X": "normal_processed", + "index": "normal_timestamp" + }, + "sklearn.impute.SimpleImputer#2": { + "X": "normal_imputed" + }, + "sigllm.primitives.transformation.Float2Scalar#2": { + "X": "normal_scalar" + }, + "sigllm.primitives.transformation.format_as_string#2": { + "X": "normal_str" + }, + + "sigllm.primitives.prompting.huggingface#1": { + "y": "y_hat" + }, + "sigllm.primitives.transformation.parse_anomaly_response#1": { + "X": "y_parsed" + }, + "sigllm.primitives.transformation.format_as_integer#1": { + "X": "y_intermediate" + }, + "sigllm.primitives.prompting.anomalies.val2idx#1": { + "y": "y_idx" + }, + "sigllm.primitives.prompting.anomalies.find_anomalies_in_windows#1": { + "y": "y_windows" + }, + "sigllm.primitives.prompting.anomalies.merge_anomalous_sequences#1": { + "y": "y_merged" + } + } +} diff --git a/sigllm/pipelines/prompter/prompter_artificialwithanomaly.json b/sigllm/pipelines/prompter/prompter_artificialwithanomaly.json new file mode 100644 index 0000000..eebcc81 --- /dev/null +++ b/sigllm/pipelines/prompter/prompter_artificialwithanomaly.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 600 + } +} diff --git a/sigllm/pipelines/prompter/prompter_msl.json b/sigllm/pipelines/prompter/prompter_msl.json new file mode 100644 index 0000000..e4fe0c1 --- /dev/null +++ b/sigllm/pipelines/prompter/prompter_msl.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 21600 + } +} diff --git a/sigllm/pipelines/prompter/prompter_realadexchange.json b/sigllm/pipelines/prompter/prompter_realadexchange.json new file mode 100644 index 0000000..6b8aac0 --- /dev/null +++ b/sigllm/pipelines/prompter/prompter_realadexchange.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 3600 + } +} diff --git a/sigllm/pipelines/prompter/prompter_realawscloudwatch.json b/sigllm/pipelines/prompter/prompter_realawscloudwatch.json new file mode 100644 index 0000000..eebcc81 --- /dev/null +++ b/sigllm/pipelines/prompter/prompter_realawscloudwatch.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 600 + } +} diff --git a/sigllm/pipelines/prompter/prompter_realtraffic.json b/sigllm/pipelines/prompter/prompter_realtraffic.json new file mode 100644 index 0000000..eebcc81 --- /dev/null +++ b/sigllm/pipelines/prompter/prompter_realtraffic.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 600 + } +} diff --git a/sigllm/pipelines/prompter/prompter_realtweets.json b/sigllm/pipelines/prompter/prompter_realtweets.json new file mode 100644 index 0000000..eebcc81 --- /dev/null +++ b/sigllm/pipelines/prompter/prompter_realtweets.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 600 + } +} diff --git a/sigllm/pipelines/prompter/prompter_smap.json b/sigllm/pipelines/prompter/prompter_smap.json new file mode 100644 index 0000000..e4fe0c1 --- /dev/null +++ b/sigllm/pipelines/prompter/prompter_smap.json @@ -0,0 +1,6 @@ +{ + "mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": { + "time_column": "timestamp", + "interval": 21600 + } +} diff --git a/sigllm/primitives/jsons/sigllm.primitives.prompting.huggingface.HF.json b/sigllm/primitives/jsons/sigllm.primitives.prompting.huggingface.HF.json index 91d0530..b78afc8 100644 --- a/sigllm/primitives/jsons/sigllm.primitives.prompting.huggingface.HF.json +++ b/sigllm/primitives/jsons/sigllm.primitives.prompting.huggingface.HF.json @@ -16,6 +16,11 @@ { "name": "X", "type": "ndarray" + }, + { + "name": "normal", + "type": "ndarray", + "default": null } ], "output": [ @@ -63,6 +68,10 @@ "padding": { "type": "int", "default": 0 + }, + "restrict_tokens": { + "type": "bool", + "default": false } } } diff --git a/sigllm/primitives/jsons/sigllm.primitives.transformation.format_as_string.json b/sigllm/primitives/jsons/sigllm.primitives.transformation.format_as_string.json index 89d18f5..8f0b115 100644 --- a/sigllm/primitives/jsons/sigllm.primitives.transformation.format_as_string.json +++ b/sigllm/primitives/jsons/sigllm.primitives.transformation.format_as_string.json @@ -4,7 +4,7 @@ "Sarah Alnegheimish ", "Linh Nguyen " ], - "description": "Transform an ndarray of scalar values to an ndarray of string.", + "description": "Format X to string(s). Handles both normal time series (single string) and multiple windows (list of strings).", "classifiers": { "type": "preprocessor", "subtype": "tranformer" @@ -34,6 +34,10 @@ "space": { "type": "bool", "default": false + }, + "normal": { + "type": "bool", + "default": false } } } diff --git a/sigllm/primitives/jsons/sigllm.primitives.transformation.parse_anomaly_response.json b/sigllm/primitives/jsons/sigllm.primitives.transformation.parse_anomaly_response.json new file mode 100644 index 0000000..a7ff470 --- /dev/null +++ b/sigllm/primitives/jsons/sigllm.primitives.transformation.parse_anomaly_response.json @@ -0,0 +1,25 @@ +{ + "name": "sigllm.primitives.transformation.parse_anomaly_response", + "contributors": ["Salim Cherkaoui"], + "description": "Parse LLM responses to extract anomaly values from text format.", + "classifiers": { + "type": "transformer", + "subtype": "parser" + }, + "modalities": ["text"], + "primitive": "sigllm.primitives.transformation.parse_anomaly_response", + "produce": { + "args": [ + { + "name": "X", + "type": "ndarray" + } + ], + "output": [ + { + "name": "X", + "type": "ndarray" + } + ] + } +} \ No newline at end of file diff --git a/sigllm/primitives/prompting/anomalies.py b/sigllm/primitives/prompting/anomalies.py index d70164d..82c462f 100644 --- a/sigllm/primitives/prompting/anomalies.py +++ b/sigllm/primitives/prompting/anomalies.py @@ -35,6 +35,7 @@ def val2idx(y, X): idx_win_list.append(indices) idx_list.append(idx_win_list) idx_list = np.array(idx_list, dtype=object) + return idx_list @@ -57,7 +58,6 @@ def find_anomalies_in_windows(y, alpha=0.5): idx_list = [] for samples in y: min_vote = np.ceil(alpha * len(samples)) - # print(type(samples.tolist())) flattened_res = np.concatenate(samples.tolist()) @@ -67,6 +67,7 @@ def find_anomalies_in_windows(y, alpha=0.5): idx_list.append(final_list) idx_list = np.array(idx_list, dtype=object) + return idx_list @@ -112,7 +113,7 @@ def format_anomalies(y, timestamp, padding_size=50): Args: y (ndarray): - A 1-dimensional array of indices. + A 1-dimensional array of indices. Can be empty if no anomalies are found. timestamp (ndarray): List of full timestamp of the signal. padding_size (int): @@ -120,8 +121,12 @@ def format_anomalies(y, timestamp, padding_size=50): Returns: List[Tuple]: - List of intervals (start, end, score). + List of intervals (start, end, score). Empty list if no anomalies are found. """ + # Handle empty array case + if len(y) == 0: + return [] + y = timestamp[y] # Convert list of indices into list of timestamps start, end = timestamp[0], timestamp[-1] interval = timestamp[1] - timestamp[0] @@ -151,4 +156,5 @@ def format_anomalies(y, timestamp, padding_size=50): merged_intervals.append(current_interval) # Append the current interval if no overlap merged_intervals = [(interval[0], interval[1], 0) for interval in merged_intervals] + return merged_intervals diff --git a/sigllm/primitives/prompting/huggingface.py b/sigllm/primitives/prompting/huggingface.py index ac33874..301253e 100644 --- a/sigllm/primitives/prompting/huggingface.py +++ b/sigllm/primitives/prompting/huggingface.py @@ -47,6 +47,8 @@ class HF: padding (int): Additional padding token to forecast to reduce short horizon predictions. Default to `0`. + restrict_tokens (bool): + Whether to restrict tokens or not. Default to `False`. """ def __init__( @@ -59,6 +61,7 @@ def __init__( raw=False, samples=10, padding=0, + restrict_tokens=False, ): self.name = name self.sep = sep @@ -68,6 +71,7 @@ def __init__( self.raw = raw self.samples = samples self.padding = padding + self.restrict_tokens = restrict_tokens self.tokenizer = AutoTokenizer.from_pretrained(self.name, use_fast=False) @@ -85,16 +89,19 @@ def __init__( self.tokenizer.add_special_tokens(special_tokens_dict) self.tokenizer.pad_token = self.tokenizer.eos_token # indicate the end of the time series - # invalid tokens - valid_tokens = [] - for number in VALID_NUMBERS: - token = self.tokenizer.convert_tokens_to_ids(number) - valid_tokens.append(token) + # Only set up invalid tokens if restriction is enabled + if self.restrict_tokens: + valid_tokens = [] + for number in VALID_NUMBERS: + token = self.tokenizer.convert_tokens_to_ids(number) + valid_tokens.append(token) - valid_tokens.append(self.tokenizer.convert_tokens_to_ids(self.sep)) - self.invalid_tokens = [ - [i] for i in range(len(self.tokenizer) - 1) if i not in valid_tokens - ] + valid_tokens.append(self.tokenizer.convert_tokens_to_ids(self.sep)) + self.invalid_tokens = [ + [i] for i in range(len(self.tokenizer) - 1) if i not in valid_tokens + ] + else: + self.invalid_tokens = None self.model = AutoModelForCausalLM.from_pretrained( self.name, @@ -104,12 +111,15 @@ def __init__( self.model.eval() - def detect(self, X, **kwargs): + def detect(self, X, normal=None, **kwargs): """Use HF to detect anomalies of a signal. Args: X (ndarray): Input sequences of strings containing signal values + normal (str, optional): + A normal reference sequence for one-shot prompting. If None, + zero-shot prompting is used. Default to None. Returns: list, list: @@ -120,31 +130,67 @@ def detect(self, X, **kwargs): max_tokens = input_length * float(self.anomalous_percent) all_responses, all_generate_ids = [], [] + # Prepare the one-shot example if provided + one_shot_message = '' + if normal is not None: + one_shot_message = PROMPTS['one_shot_prefix'] + normal + '\n\n' + for text in tqdm(X): system_message = PROMPTS['system_message'] - user_message = PROMPTS['user_message'] - message = ' '.join([system_message, user_message, text, '[RESPONSE]']) - - input_length = len(self.tokenizer.encode(message[0])) + if self.restrict_tokens: + user_message = PROMPTS['user_message'] + else: + user_message = PROMPTS['user_message_2'] + + # Combine messages with one-shot example if provided + message = ' '.join([ + system_message, + one_shot_message, + user_message, + text, + '[RESPONSE]', + ]) + + input_length = len(self.tokenizer.encode(message)) tokenized_input = self.tokenizer(message, return_tensors='pt').to('cuda') - generate_ids = self.model.generate( - **tokenized_input, - do_sample=True, - max_new_tokens=max_tokens, - temperature=self.temp, - top_p=self.top_p, - bad_words_ids=self.invalid_tokens, - renormalize_logits=True, - num_return_sequences=self.samples, - ) - - responses = self.tokenizer.batch_decode( - generate_ids[:, input_length:], - skip_special_tokens=True, - clean_up_tokenization_spaces=False, - ) + generate_kwargs = { + 'do_sample': True, + 'max_new_tokens': max_tokens, + 'temperature': self.temp, + 'top_p': self.top_p, + 'renormalize_logits': True, + 'num_return_sequences': self.samples, + } + + # Only add bad_words_ids if token restriction is enabled + if self.restrict_tokens: + generate_kwargs['bad_words_ids'] = self.invalid_tokens + + generate_ids = self.model.generate(**tokenized_input, **generate_kwargs) + + if self.restrict_tokens: + responses = self.tokenizer.batch_decode( + generate_ids[:, input_length:], + skip_special_tokens=True, + clean_up_tokenization_spaces=False, + ) + else: # Extract only the part after [RESPONSE] + # Get the full generated text + full_responses = self.tokenizer.batch_decode( + generate_ids, + skip_special_tokens=True, + clean_up_tokenization_spaces=False, + ) + responses = [] + for full_response in full_responses: + try: + response = full_response.split('[RESPONSE]')[1].strip() + responses.append(response) + except IndexError: + responses.append('') # If no [RESPONSE] found, return empty string + all_responses.append(responses) all_generate_ids.append(generate_ids) diff --git a/sigllm/primitives/prompting/huggingface_messages.json b/sigllm/primitives/prompting/huggingface_messages.json index 3ad1dad..1b57617 100644 --- a/sigllm/primitives/prompting/huggingface_messages.json +++ b/sigllm/primitives/prompting/huggingface_messages.json @@ -1,4 +1,6 @@ { - "system_message": "You are an exceptionally intelligent assistant that detect anomalies in time series data by listing all the anomalies.", - "user_message": "Below is a [SEQUENCE], please return the anomalies in that sequence in [RESPONSE]. Only return the numbers. [SEQUENCE]" + "system_message": "You are an expert in time series analysis. Your task is to detect anomalies in time series data.", + "user_message": "Below is a [SEQUENCE], please return the anomalies in that sequence in [RESPONSE]. Only return the numbers. [SEQUENCE]", + "user_message_2": "Below is a [SEQUENCE], analyze the following time series and identify any anomalies. If you find anomalies, provide their values in the format [first_anomaly, ..., last_anomaly]. If no anomalies are found, respond with 'no anomalies'. Be concise, do not write code, do not permorm any calculations, just give your answers as told.: [SEQUENCE]", + "one_shot_prefix": "Here is a normal reference of the time series: [NORMAL]" } \ No newline at end of file diff --git a/sigllm/primitives/prompting/timeseries_preprocessing.py b/sigllm/primitives/prompting/timeseries_preprocessing.py index e5d3644..fee3de9 100644 --- a/sigllm/primitives/prompting/timeseries_preprocessing.py +++ b/sigllm/primitives/prompting/timeseries_preprocessing.py @@ -37,5 +37,4 @@ def rolling_window_sequences(X, window_size=500, step_size=100): out_X.append(X[start:end]) X_index.append(index[start]) start = start + step_size - return np.asarray(out_X), np.asarray(X_index), window_size, step_size diff --git a/sigllm/primitives/transformation.py b/sigllm/primitives/transformation.py index 41a98fc..5037131 100644 --- a/sigllm/primitives/transformation.py +++ b/sigllm/primitives/transformation.py @@ -6,34 +6,43 @@ import numpy as np -def format_as_string(X, sep=',', space=False): +def format_as_string(X, sep=',', space=False, normal=False): """Format X to a list of string. - Transform a 2-D array of integers to a list of strings, - seperated by the indicated seperator and space. + Transform an array of integers to string(s), separated by the indicated separator and space. + Handles two cases: + - If normal=True, treats X as a single time series (window_size, 1) + - If normal=False, treats X as multiple windows (num_windows, window_size, 1) Args: sep (str): String to separate each element in X. Default to `','`. space (bool): Whether to add space between each digit in the result. Default to `False`. + normal (bool): + Whether to treat X as a normal time series. If True, expects (window_size, 1) + and returns a single string. If False, expects (num_windows, window_size, 1) + and returns a list of strings. Default to `False`. Returns: - ndarray: - A list of string representation of each row. + ndarray or str: + If normal=True, returns a single string representation. If normal=False, + returns a list of string representations for each window. """ def _as_string(x): text = sep.join(list(map(str, x.flatten()))) - if space: text = ' '.join(text) - return text - results = list(map(_as_string, X)) - - return np.array(results) + if normal: + # Handle as single time series (window_size, 1) + return _as_string(X) + else: + # Handle as multiple windows (num_windows, window_size, 1) + results = list(map(_as_string, X)) + return np.array(results) def _from_string_to_integer(text, sep=',', trunc=None, errors='ignore'): @@ -74,6 +83,7 @@ def format_as_integer(X, sep=',', trunc=None, errors='ignore'): Transforms a list of list of string input as 3-D array of integers, seperated by the indicated seperator and truncated based on `trunc`. + Handles empty strings by returning empty arrays. Args: sep (str): @@ -91,7 +101,7 @@ def format_as_integer(X, sep=',', trunc=None, errors='ignore'): Returns: ndarray: - An array of digits values. + An array of digits values. Empty arrays for empty strings. """ result = list() for string_list in X: @@ -100,8 +110,11 @@ def format_as_integer(X, sep=',', trunc=None, errors='ignore'): raise ValueError('Input is not a list of lists.') for text in string_list: - scalar = _from_string_to_integer(text, sep, trunc, errors) - sample.append(scalar) + if not text: # Handle empty string + sample.append(np.array([], dtype=float)) + else: + scalar = _from_string_to_integer(text, sep, trunc, errors) + sample.append(scalar) result.append(sample) @@ -171,3 +184,81 @@ def transform(self, X, minimum=0, decimal=2): values = X * 10 ** (-decimal) return values + minimum + + +def parse_anomaly_response(X): + """Parse a list of lists of LLM responses to extract anomaly values and format them as strings. + + Args: + X (List[List[str]]): + List of lists of response texts from the LLM in the format + "Answer: no anomalies" or "Answer: [val1, val2, ..., valN]" + + Returns: + List[List[str]]: + List of lists of parsed responses where each element is either + "val1,val2,...,valN" if anomalies are found, or empty string if + no anomalies are present + """ + + def _parse_single_response(text: str): + # Clean the input text + text = text.strip().lower() + + # Check for "no anomalies" case + if 'no anomalies' in text or 'no anomaly' in text: + return '' + + # Try to extract the values using regex + # Match anything inside square brackets that consists of digits and commas + pattern = r'\[([\d\s,]+)\]' + match = re.search(pattern, text) + + if match: + # Extract the content inside brackets and clean it + values = match.group(1) + # Split by comma, strip whitespace, and filter out empty strings + values = [val.strip() for val in values.split(',') if val.strip()] + # Join the values with commas + return ','.join(values) + + # Return empty string if no valid format is found + return '' + + # Process each list of responses in the input + result = [] + for response_list in X: + # Process each response in the inner list + parsed_list = [_parse_single_response(response) for response in response_list] + result.append(parsed_list) + + # return np.array(result, dtype=object) + return result + + +def format_as_single_string(X, sep=',', space=False): + """Format a single time series to a string. + + Transform a 1-D array of integers to a single string, + separated by the indicated separator and space. + + Args: + sep (str): + String to separate each element in X. Default to `','`. + space (bool): + Whether to add space between each digit in the result. Default to `False`. + + Returns: + str: + A string representation of the time series. + """ + # Ensure X is 1D + if X.ndim > 1: + X = X.flatten() + + text = sep.join(list(map(str, X))) + + if space: + text = ' '.join(text) + + return text