class IntelligentDataAnalyzer(BaseTool):
title: str = "intelligent_data_analyzer"
description: str = "Superior information evaluation software that performs statistical evaluation, machine studying clustering, outlier detection, correlation evaluation, and generates visualizations with actionable insights."
args_schema: sort(BaseModel) = DataAnalysisInput
response_format: str = "content_and_artifact"
def _run(self, information: Checklist(Dict), analysis_type: str = "complete", target_column: Elective(str) = None, max_clusters: int = 5) -> Tuple(str, Dict):
attempt:
df = pd.DataFrame(information)
if df.empty:
increase ToolException("Dataset is empty")
insights = {"dataset_info": self._get_dataset_info(df)}
if analysis_type in ("complete", "correlation"):
insights("correlation_analysis") = self._correlation_analysis(df)
if analysis_type in ("complete", "clustering"):
insights("clustering_analysis") = self._clustering_analysis(df, max_clusters)
if analysis_type in ("complete", "outlier"):
insights("outlier_detection") = self._outlier_detection(df)
if target_column and target_column in df.columns:
insights("target_analysis") = self._target_analysis(df, target_column)
suggestions = self._generate_recommendations(df, insights)
abstract = self._create_analysis_summary(insights, suggestions)
artifact = {
"insights": insights,
"suggestions": suggestions,
"data_shape": df.form,
"analysis_type": analysis_type,
"numeric_columns": df.select_dtypes(embody=(np.quantity)).columns.tolist(),
"categorical_columns": df.select_dtypes(embody=('object')).columns.tolist()
}
return abstract, artifact
besides Exception as e:
increase ToolException(f"Evaluation failed: {str(e)}")
def _get_dataset_info(self, df: pd.DataFrame) -> Dict:
return {
"form": df.form,
"columns": df.columns.tolist(),
"dtypes": df.dtypes.astype(str).to_dict(),
"missing_values": df.isnull().sum().to_dict(),
"memory_usage": df.memory_usage(deep=True).sum()
}
def _correlation_analysis(self, df: pd.DataFrame) -> Dict:
numeric_df = df.select_dtypes(embody=(np.quantity))
if numeric_df.empty:
return {"message": "No numeric columns for correlation evaluation"}
corr_matrix = numeric_df.corr()
strong_corr = ()
for i in vary(len(corr_matrix.columns)):
for j in vary(i+1, len(corr_matrix.columns)):
corr_val = corr_matrix.iloc(i, j)
if abs(corr_val) > 0.7:
strong_corr.append({"var1": corr_matrix.columns(i), "var2": corr_matrix.columns(j), "correlation": spherical(corr_val, 3)})
return {
"correlation_matrix": corr_matrix.spherical(3).to_dict(),
"strong_correlations": strong_corr,
"avg_correlation": spherical(corr_matrix.values(np.triu_indices_from(corr_matrix.values, ok=1)).imply(), 3)
}
def _clustering_analysis(self, df: pd.DataFrame, max_clusters: int) -> Dict:
numeric_df = df.select_dtypes(embody=(np.quantity)).dropna()
if numeric_df.form(0) < 2 or numeric_df.form(1) < 2:
return {"message": "Inadequate numeric information for clustering"}
scaler = StandardScaler()
scaled_data = scaler.fit_transform(numeric_df)
inertias = ()
K_range = vary(1, min(max_clusters + 1, len(numeric_df) // 2 + 1))
for ok in K_range:
kmeans = KMeans(n_clusters=ok, random_state=42, n_init=10)
kmeans.match(scaled_data)
inertias.append(kmeans.inertia_)
optimal_k = self._find_elbow_point(inertias, K_range)
kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10)
cluster_labels = kmeans.fit_predict(scaled_data)
cluster_stats = {}
for i in vary(optimal_k):
cluster_data = numeric_df(cluster_labels == i)
cluster_stats(f"cluster_{i}") = {
"dimension": len(cluster_data),
"proportion": spherical(len(cluster_data) / len(numeric_df) * 100, 1),
"means": cluster_data.imply().spherical(3).to_dict()
}
return {
"optimal_clusters": optimal_k,
"cluster_stats": cluster_stats,
"silhouette_score": spherical(silhouette_score(scaled_data, cluster_labels), 3) if len(set(cluster_labels)) > 1 else 0.0,
"inertias": inertias
}
def _outlier_detection(self, df: pd.DataFrame) -> Dict:
numeric_df = df.select_dtypes(embody=(np.quantity))
if numeric_df.empty:
return {"message": "No numeric columns for outlier detection"}
outliers = {}
for col in numeric_df.columns:
information = numeric_df(col).dropna()
Q1, Q3 = information.quantile(0.25), information.quantile(0.75)
IQR = Q3 - Q1
iqr_outliers = information((information < Q1 - 1.5 * IQR) | (information > Q3 + 1.5 * IQR))
z_scores = np.abs((information - information.imply()) / information.std())
z_outliers = information(z_scores > 3)
outliers(col) = {
"iqr_outliers": len(iqr_outliers),
"z_score_outliers": len(z_outliers),
"outlier_percentage": spherical(len(iqr_outliers) / len(information) * 100, 2)
}
return outliers
def _target_analysis(self, df: pd.DataFrame, target_col: str) -> Dict:
if target_col not in df.columns:
return {"error": f"Column {target_col} not discovered"}
target_data = df(target_col).dropna()
if pd.api.varieties.is_numeric_dtype(target_data):
return {
"sort": "numeric",
"stats": {
"imply": spherical(target_data.imply(), 3),
"median": spherical(target_data.median(), 3),
"std": spherical(target_data.std(), 3),
"skewness": spherical(target_data.skew(), 3),
"kurtosis": spherical(target_data.kurtosis(), 3)
},
"distribution": "regular" if abs(target_data.skew()) < 0.5 else "skewed"
}
else:
value_counts = target_data.value_counts()
return {
"sort": "categorical",
"unique_values": len(value_counts),
"most_common": value_counts.head(5).to_dict(),
"entropy": spherical(-sum((p := value_counts / len(target_data)) * np.log2(p + 1e-10)), 3)
}
def _generate_recommendations(self, df: pd.DataFrame, insights: Dict) -> Checklist(str):
suggestions = ()
missing_pct = sum(insights("dataset_info")("missing_values").values()) / (df.form(0) * df.form(1)) * 100
if missing_pct > 10:
suggestions.append(f"Contemplate information imputation - {missing_pct:.1f}% lacking values detected")
if "correlation_analysis" in insights and insights("correlation_analysis").get("strong_correlations"):
suggestions.append("Robust correlations detected - contemplate function choice or dimensionality discount")
if "clustering_analysis" in insights:
cluster_info = insights("clustering_analysis")
if isinstance(cluster_info, dict) and "optimal_clusters" in cluster_info:
suggestions.append(f"Information segments into {cluster_info('optimal_clusters')} distinct teams - helpful for focused methods")
if "outlier_detection" in insights:
high_outlier_cols = (col for col, data in insights("outlier_detection").objects() if isinstance(data, dict) and data.get("outlier_percentage", 0) > 5)
if high_outlier_cols:
suggestions.append(f"Excessive outlier proportion in: {', '.be part of(high_outlier_cols)} - examine information high quality")
return suggestions if suggestions else ("Information seems well-structured with no fast considerations")
def _create_analysis_summary(self, insights: Dict, suggestions: Checklist(str)) -> str:
dataset_info = insights("dataset_info")
abstract = f"""📊 INTELLIGENT DATA ANALYSIS COMPLETE
Dataset Overview: {dataset_info('form')(0)} rows × {dataset_info('form')(1)} columns
Numeric Options: {len((c for c, t in dataset_info('dtypes').objects() if 'int' in t or 'float' in t))}
Categorical Options: {len((c for c, t in dataset_info('dtypes').objects() if 'object' in t))}
Key Insights Generated:
• Statistical correlations and relationships recognized
• Clustering patterns found for segmentation
• Outlier detection accomplished for information high quality evaluation
• Characteristic significance and distribution evaluation carried out
Prime Suggestions:
{chr(10).be part of('• ' + rec for rec in suggestions(:3))}
Evaluation consists of ML-powered clustering, statistical correlations, and actionable enterprise insights."""
return abstract
def _find_elbow_point(self, inertias: Checklist(float), k_range: vary) -> int:
if len(inertias) < 3:
return checklist(k_range)(0)
diffs = (inertias(i-1) - inertias(i) for i in vary(1, len(inertias)))
return checklist(k_range)(diffs.index(max(diffs)) + 1) if diffs else checklist(k_range)(0)