python による dicom サーバー

フリーの dicom サーバーは dcm4chee が最強かもしれませんが、クラッシュに備えて python による自前の dicom サーバーも動かしておきます。

全体のイメージ

以下のような感じです。

左側の NAS 上に dcm4chee でディレクトリ構造を作成された DICOM というフォルダがありその中に多数の dicom ファイルが格納されています。

データベースそのものは NAS 上にはありませんが、DICOM の構造は windows server 上にある dcm4chee のデータベースが規定しています。(この図には windows server は描いてありません)

プロセス

  • 左側の NAS から dicom ファイルを ubuntu に scp ダウンロードしてタグ情報を読み取り、新しい構造 pDICOM を作成します。
  • タグ情報から必要なものを抽出して、ubuntu の mysql に書き込みます。
  • ubuntu 上の pDICOM をディレクトリごと NAS に scp アップロードします。

データベースとテーブル作成

ubuntu 上でデータベースとテーブルを作成します。


mysql> create database pydcmdb;
mysql> use pydcmdb;

mysql> CREATE TABLE `dirlist` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `dir` date DEFAULT NULL,
  `hizuke` datetime DEFAULT NULL,
  `extime` varchar(20) DEFAULT NULL,
  `status` varchar(30) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=148 DEFAULT CHARSET=utf8;

python プログラム


import paramiko
import scp
import glob
import pydicom
import os
import re
import pymysql
import shutil
import datetime
from datetime import timedelta, date
import time

class Dicom:
    
    def __init__( self ):
        """NAS1"""
        self.NASfrom = '192.168.0.72'
        self.portFrom = 22
        self.uNamefrom = 'nas1user' 
        self.pWordfrom = 'nas1pass'
        self.PATHfrom = '/volume1/DICOM/' 
        """NAS2"""
        self.NASto =  '192.168.0.11'
        self.portTo = 22 
        self.uNameto = 'nas2user'
        self.pWordto = 'nas2pass' 
        self.PATHto = '/volume1/pDICOM/'
        self.temp= '/var/www/html/dtemp/'
        self.localpydcm = '/var/www/html/pDICOM/'
        """ etc """
        self.host = 'localhost'
        self.user = 'root'
        self.db = 'pydcmdb'
        self.password = 'pswd'
        self.bdir = '/var/www/html/DICOM/'
        self.startDate = datetime.date(2017, 11, 30)
        self.start = time.time()
        self.end = 0
        self.delDir(self.temp)
          
    @classmethod         
    def delDir( self, dir ):
      for root, dirs, files in os.walk(dir, topdown=False):
          for name in files:
              os.remove(os.path.join(root, name))
          for name in dirs:
              os.rmdir(os.path.join(root, name))
 
    def getDicomArr( self, sd ):
        DicomArr = []
        for root, dir, files in os.walk( sd ):
            for file_ in files:
                full_path = os.path.join(root, file_)
                DicomArr.append( full_path )
        return DicomArr
    
    @classmethod 
    def getBdir(self, ef ):
        pattern = '\d{4}/\d{1,2}/\d{1,2}'
        res = re.search(pattern, ef)
        return res.group()
    
    """ dicom ファイルのタグ情報を読み取る """
    def getDicomInfo( self, dcmArr ):
        dcmInfoArr = []
        studyIDArr =[]
        for eachFile in dcmArr:           
            ds = pydicom.read_file(eachFile)
            studyID = ds[0x0020, 0x0010].value
            if studyID not in studyIDArr:        
                studyDate = ds[0x0008, 0x0020].value
                studyTime = ds[0x0008, 0x0030].value
                modality = ds[0x0008, 0x0060].value
                try:
                    studyDscr = ds[0x0008, 0x1030].value
                except:
                    studyDscr = ''
                ptName = str(ds[0x0010, 0x0010].value).replace('^', ' ')
                karteNo = ds[0x0010, 0x0020].value
                sex = ds[0x0010, 0x0040].value
                birthday = ds[0x0010, 0x0030].value
                age = ds[0x0010, 0x1010].value
                institution = ds[0x0008, 0x0080].value
                bdir = Dicom.getBdir(eachFile)
                path = bdir + '/' + karteNo + '/' + studyID 
                thisLineInfo = [ studyID, studyDate, studyTime, modality, studyDscr, ptName, karteNo, birthday, sex, age, institution, path ]
                dcmInfoArr.append( thisLineInfo )
                studyIDArr.append( studyID )
        return dcmInfoArr
    
    """ dicom のタグ情報を mysql に記録する """
    def insertDB( self, DicomInfoArr ):           
        conn = pymysql.connect(host=self.host,
            user=self.user,
            db=self.db,
            password=self.password,
            cursorclass=pymysql.cursors.DictCursor)
        try:
            with conn.cursor() as cursor:
                sql = "REPLACE INTO taginfo VALUES ( %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s )"
                cursor.executemany( sql, DicomInfoArr )
            conn.commit()
        finally:
            conn.close()

    def countDicom( self, dcmPath ):           
        searchDicom = dcmPath + '/*.dcm'
        files = []  
        files = glob.glob(searchDicom)   
        count = len(files)                   
        return count
        
    def copyDicomFile( self, dicomArr ):
        dicomFileDir = self.localpydcm
        for eachFile in dicomArr:
            ds = pydicom.read_file(eachFile)
            studyID = ds[0x0020, 0x0010].value
            karteNo = ds[0x0010, 0x0020].value
            firstDir = Dicom.getBdir( eachFile )
            destinationDir = self.localpydcm + firstDir + '/' + karteNo + '/' + studyID
            os.makedirs(destinationDir, exist_ok=True)
            dicomCount = self.countDicom(destinationDir)
            newFileName = studyID + "_%05d.dcm" %(dicomCount)
            destination = destinationDir + '/' + newFileName
            shutil.copyfile(eachFile, destination)
    
    """ 処理されたファイルの最大値、これは日付を表している """      
    def maxPath( self ):
        conn = pymysql.connect(host=self.host,
            user=self.user,
            db=self.db,
            password=self.password,
            cursorclass=pymysql.cursors.DictCursor)
        try:
            with conn.cursor() as cursor:
                sql = "SELECT max(dir) as mdir FROM dirlist WHERE status = 'finish' ;"
                cursor.execute(sql)
                res = cursor.fetchone()       
                maxpath = res['mdir']
                if not maxpath :
                    maxpath = self.startDate
                return maxpath
        finally:
            conn.close()  
                     
    """ 処理が終わったらそれを記録する """
    def recStatus( self, path ):           
        conn = pymysql.connect(host=self.host,
            user=self.user,
            db=self.db,
            password=self.password,
            cursorclass=pymysql.cursors.DictCursor)
        try:
            with conn.cursor() as cursor:
                sql = "REPLACE INTO dirlist ( dir, hizuke, extime, status) VALUES ( %s, %s, %s, %s )"
                dcm.endTime()
                extime = dcm.calTime()
                thisTime = datetime.datetime.now()
                data = [path, thisTime, extime, 'finish']
                cursor.execute( sql, data )
            conn.commit()
        finally:
            conn.close()
            
    def calTime(self):
        time = self.end - self.start
        return time

    def endTime(self):
        self.end = time.time()
        
    """ scp get と put """
    def scp_get( self, searchDir ): 
        Dicom.delDir( self.temp )
        Dicom.delDir( self.localpydcm )  
        with paramiko.SSHClient() as ssh:
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(hostname=self.NASfrom, port=self.portFrom, username=self.uNamefrom, password=self.pWordfrom)            
            with scp.SCPClient(ssh.get_transport()) as scpc:                              
                remoteDir = self.PATHfrom + searchDir
                localDir = self.temp + searchDir
                os.makedirs(localDir)
                try:
                    scpc.get( remote_path = remoteDir, local_path = localDir, recursive = True)
                except:
                    pass

    def scp_put( self ):       
        with paramiko.SSHClient() as ssh:
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(hostname=self.NASto, port=self.portTo, username=self.uNameto, password=self.pWordto)            
            with scp.SCPClient(ssh.get_transport()) as scpc:                
                scpc.put( self.localpydcm, self.PATHto, recursive = True)
                
    """ 実行条件を決定する """                
    def execution(self, nxDate ):
        today = date.today()
        thisHour = datetime.datetime.now().hour
        if 10 < thisHour and thisHour < 18 and nxDate < today:
            return True
                      
if __name__ == "__main__":
    dcm = Dicom() 
    exec = dcm.execution( dcm.maxPath() )
    """ exec が true である限り実行する """
    while exec:
        dcm.start = time.time()
        nxDate = dcm.maxPath() + timedelta(days=1)
        tdir = nxDate.strftime('%Y/%-m/%-d')

        """ NAS からローカルに dicom ファイルを scp ダウンロードする"""
        dcm.scp_get(tdir)
        """ dicom ファイル情報を取得してそれに応じてディレクトリ構造を作成し dicom ファイルを rename してコピーする """
        dcmArr = dcm.getDicomArr( dcm.temp )
        infos = dcm.getDicomInfo( dcmArr )
        dcm.insertDB( infos )
        dcm.copyDicomFile( dcmArr ) 
        """ 新しい dicom のファイル構造を全体として別の NAS に scp アップロードする """
        dcm.scp_put()
        """ 処理した情報をデータベースに書き込む """
        dcm.recStatus( nxDate )
        """ 実行条件の確認 """
        exec = dcm.execution( nxDate )