Source code for morphforgecontrib.simulation_analysis.spikinggrouping

#!/usr/bin/python
# -*- coding: utf-8 -*-

# ---------------------------------------------------------------------
# Copyright (c) 2012 Michael Hull.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  - Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  - Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the
#    distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
#  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# ----------------------------------------------------------------------

import numpy as np
from morphforge.traces.eventset import EventSet


[docs]class DBScan(object): @classmethod
[docs] def query_region(cls, p, eps, pts): nr_indices = np.nonzero(np.fabs(pts - p) < eps)[0] return set(nr_indices)
@classmethod
[docs] def cluster_points(cls, pts, eps, min_pts): pts = np.array(pts) visited_indices = [False] * np.zeros(len(pts)) noise = [] clusters = [] for (i, pt) in enumerate(pts): if visited_indices[i]: continue visited_indices[i] = 1 n_points = cls.query_region(p=pt, eps=eps, pts=pts) if len(n_points) < min_pts: noise.append(i) else: new_cluster = set() cls.expand_cluster(pt_index=i, npoints=n_points, cluster=new_cluster, eps=eps, pts=pts, min_pts=min_pts, visited_indices=visited_indices, clusters=clusters) clusters.append(new_cluster) return (clusters, noise)
@classmethod
[docs] def expand_cluster(cls, pt_index, npoints, cluster, pts, eps, visited_indices, min_pts, clusters): cluster.add(pt_index) iter_n = list(npoints) while iter_n: pdash_index = iter_n.pop() if not visited_indices[pdash_index]: visited_indices[pdash_index] = 1 n_dash = cls.query_region(p=pts[pdash_index], eps=eps, pts=pts) if len(n_dash) >= min_pts: for n in n_dash: iter_n.append(n) pt_in_clusters = [True for c in clusters if pdash_index in c] if not pt_in_clusters: cluster.add(pdash_index)
@classmethod
[docs] def cluster_spike_times(cls, event_set, eps, min_pts=5): eps = float(eps.rescale('ms').magnitude) data = [float(ev.get_time().rescale('ms')) for ev in event_set] (clusters, noise) = DBScan.cluster_points(pts=np.array(data), eps=eps, min_pts=min_pts) # Create new eventsets for each cluster new_eventsets = [] for cluster in clusters: e = EventSet() for c in cluster: e.add_event(event_set[c]) new_eventsets.append(e) # Create a new EventSet for the noise points: noise_event_set = EventSet(events=[event_set[i] for i in noise]) return new_eventsets, noise_event_set
@classmethod
[docs] def calculate_mean_frequency(cls, cluster_sets): mean_times = [np.mean([t.rescale('ms') for t in c.times]) for c in cluster_sets if len(c) != 0] mean_times = np.array([mt for mt in mean_times if mt > 200]) np.sort(mean_times) mean_times.sort() print mean_times isi = np.diff(mean_times) freq = 1000.0 / isi mean_freq = np.mean(freq) return mean_freq