基于CNN实现ECG心率失常分类

数据来源

MIT-BIH arrhythmia database

代码

利用注释信息切割心电图

import wfdb 
import glob, os
import numpy as np

def get_records():
    """ Get paths for data in data/mit/ directory """
    # Download if doesn't exist
    # There are 3 files for each record
    # *.atr is one of them
    paths = glob.glob('F:/data/*.atr')
    # Get rid of the extension
    paths = [path[:-4] for path in paths]
    paths.sort()
    return paths

def beat_annotations(annotation):
    """ Get rid of non-beat markers """
    """'N' for normal beats. Similarly we can give the input 'L' for left bundle branch block beats. 'R' for right bundle branch block
        beats. 'A' for Atrial premature contraction. 'V' for ventricular premature contraction. '/' for paced beat. 'E' for Ventricular
        escape beat."""
    good = ['N']   
    ids = np.in1d(annotation.symbol, good)
    # We want to know only the positions
    beats = annotation.sample[ids]
    return beats
  
def segmentation(records):
    Normal = []
    for e in records:
        signals, fields = wfdb.rdsamp(e, channels = [0]) 
        ann = wfdb.rdann(e, 'atr')
        good = ['N']
        ids = np.in1d(ann.symbol, good)
        imp_beats = ann.sample[ids]
        beats = (ann.sample)
        for i in imp_beats:
            beats = list(beats)
            j = beats.index(i)
            if(j!=0 and j!=(len(beats)-1)):
                x = beats[j-1]
                y = beats[j+1]
                diff1 = abs(x - beats[j])//2
                diff2 = abs(y - beats[j])//2
                Normal.append(signals[beats[j] - diff1: beats[j] + diff2, 0])
    return Normal

annotation = wfdb.rdann(get_records()[0], 'atr')
beat_annotations(annotation)
segmentation(get_records())

利用R峰切割心电图

from biosppy.signals import ecg
import biosppy
signals, fields = wfdb.rdsamp(get_records()[0], channels=[0])
display(signals)
display(fields)
out = ecg.ecg(signal=signals.flatten(), sampling_rate=1000., show=True)
data = signals.flatten()
signals = []
count = 1
peaks =  biosppy.signals.ecg.christov_segmenter(signal=data, sampling_rate = 200)[0]
for i in (peaks[1:-1]):
    diff1 = abs(peaks[count - 1] - i)
    diff2 = abs(peaks[count + 1]- i)
    x = peaks[count - 1] + diff1//2
    y = peaks[count + 1] - diff2//2
    signal = data[x:y]
    signals.append(signal)
    count += 1

ECG信号转ECG图像

import matplotlib.pyplot as plt
import cv2
for count, i in enumerate(signals):
  fig = plt.figure(frameon=False)
  plt.plot(i) 
  plt.xticks([]), plt.yticks([])
  for spine in plt.gca().spines.values():
     spine.set_visible(False)

  filename = "F:/data" + '/' + str(count)+'.png'
  fig.savefig(filename)
  im_gray = cv2.imread(filename, cv2.IMREAD_GRAYSCALE)
  im_gray = cv2.resize(im_gray, (128, 128), interpolation = cv2.INTER_LANCZOS4)
  cv2.imwrite(filename, im_gray)

数据增强(Data Augmentation)

image = cv2.imread('F:/data/1.png')
filename="1.png"
def cropping(image, filename):
    
    #Left Top Crop
    crop = image[:96, :96]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'leftTop' + '.png', crop)
    
    #Center Top Crop
    crop = image[:96, 16:112]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'centerTop' + '.png', crop)
    
    #Right Top Crop
    crop = image[:96, 32:]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'rightTop' + '.png', crop)
    
    #Left Center Crop
    crop = image[16:112, :96]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'leftCenter' + '.png', crop)
    
    #Center Center Crop
    crop = image[16:112, 16:112]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'centerCenter' + '.png', crop)
    
    #Right Center Crop
    crop = image[16:112, 32:]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'rightCenter' + '.png', crop)
    
    #Left Bottom Crop
    crop = image[32:, :96]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'leftBottom' + '.png', crop)
    
    #Center Bottom Crop
    crop = image[32:, 16:112]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'centerBottom' + '.png', crop)
    
    #Right Bottom Crop
    crop = image[32:, 32:]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'rightBottom' + '.png', crop)

图像数据导入

import random
import cv2
import os
import numpy as np
from keras.utils import to_categorical
from imutils import paths
from sklearn.model_selection import train_test_split
from keras.preprocessing.image import img_to_array

def load_data2(path):
  print("[INFO] loading images...")
  data = []
  labels = []
  # grab the image paths and randomly shuffle them
  imagePaths = sorted(list(paths.list_images(path)))
  random.seed(42)
  random.shuffle(imagePaths)
  # loop over the input images
  for imagePath in imagePaths:
    # load the image, pre-process it, and store it in the data list
    image = cv2.imread(imagePath)
    #image = cv2.resize(image, (norm_size, norm_size))
    image = img_to_array(image)
    data.append(image)

    # extract the class label from the image path and update the
    # labels list
    label = int(imagePath.split(os.path.sep)[-2])    
    labels.append(label)  
     
  # scale the raw pixel intensities to the range [0, 1]
  data = np.array(data, dtype="float") / 255.0
  labels = np.array(labels)


  # partition the data into training and testing splits using 75% of
  # the data for training and the remaining 25% for testing
  (trainX, testX, trainY, testY) = train_test_split(data,
      labels, test_size=0.25, random_state=42)

  # convert the labels from integers to vectors
  trainY = to_categorical(trainY, num_classes=2)
  testY = to_categorical(testY, num_classes=2)  
  return trainX,trainY,testX,testY

trainX,trainY,testX,testY = load_data2("F:\\pic\\")

VGGNET模型搭建

from keras.models import Sequential
from keras.layers.normalization import BatchNormalization
from keras.layers.convolutional import Conv2D
from keras.layers import MaxPool2D
from keras.layers.core import Activation, Flatten, Dropout, Dense
from keras import backend as K
import keras 

model = Sequential()
model.add(Conv2D(64, (3,3),strides = (1,1), input_shape = (128,128,3),kernel_initializer='glorot_uniform'))
model.add(keras.layers.ELU())
model.add(BatchNormalization())
model.add(Conv2D(64, (3,3),strides = (1,1),kernel_initializer='glorot_uniform'))
model.add(keras.layers.ELU())
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2), strides= (2,2)))
model.add(Conv2D(128, (3,3),strides = (1,1),kernel_initializer='glorot_uniform'))
model.add(keras.layers.ELU())
model.add(BatchNormalization())
model.add(Conv2D(128, (3,3),strides = (1,1),kernel_initializer='glorot_uniform'))
model.add(keras.layers.ELU())
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2), strides= (2,2)))
model.add(Conv2D(256, (3,3),strides = (1,1),kernel_initializer='glorot_uniform'))
model.add(keras.layers.ELU())
model.add(BatchNormalization())
model.add(Conv2D(256, (3,3),strides = (1,1),kernel_initializer='glorot_uniform'))
model.add(keras.layers.ELU())
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=(2, 2), strides= (2,2)))
model.add(Flatten())
model.add(Dense(2048))
model.add(keras.layers.ELU())
model.add(BatchNormalization())
model.add(Dropout(0.5))
model.add(Dense(2, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.compile(optimizer='sgd',loss='categorical_crossentropy',metrics=['accuracy'])  
model.summary()  
model.fit(trainX,trainY,validation_data=(testX,testY),batch_size=20,epochs=20,verbose=2)  
print(model.evaluate(testX,testY,batch_size=20,verbose=2))  

问题

Q:spyder中让生成的图像在单独窗口中显示

A:Tools-->Preferences-->IPython console-->Graphics-->Graphics backend--> Backend设置成Automatic,format设置成SVG

参考资料

[01]. ECG arrhythmia classification using a 2-D convolutional neural network

[02]. Demo Scripts for the wfdb-python package

[03]. Deep-Learning-Based-ECG-Annotator