计算机上的二大应用,一是从WEB服务器上获得数据,另一种是向关系数据库中写入数据。在上集我已提出了一个从WEB上获得OPC数据的独创方法,现在谈谈第二种如何快速地把OPC数据写进到数据库中,这也是Calssic OPC最典型的一个应用场景。
使用基金会提供的基于.NET的ADO.NET无疑不是一个最快最有效率的办法,原因是显而易见的。要想速度快,必然要考虑到原生的基于COM的数据库技术,比如OLE DB,ADO或者ODBC。根据《ADO ActiveX Data Objects》一书描述的三者架构关系图,
显然,ADO是对OLE DB技术的上一层封装,是对当时OLE DB技术上的繁琐和难以找到熟悉COM的开发人员的一种妥协。自然,ADO的表现要比OLE DB逊色一些。不同于微软私有的ADO/OLE DB技术,ODBC是一个国际标准,有通用的接口,但性能上还是比OLE DB差了些。原因有三:第一,ODBC诞生在1992年,OLE DB出现在1996年,当年微软是想用它代替ODBC的,所以OLE DB在设计上有后发优势。第二,ODBC和OLEDB都有BIND的功能,比如ODBC有SQLBindCol()函数调用,而OLE DB不一样,要自己亲手写BIND,看上去很繁琐。其实也正是这样的繁琐保证了它性能上的优越。第三,最重要的一点,ODBC是工作在不同的查询语句上的,比如INSERT,UPDATE等,所以服务端需要进行解析。OLE DB可以使用查询语句,也可以不使用查询语句而完成INSERT、UPDATE等操作——没有了服务端的解析,自然就快了许多。有人做过测试,用ODBC的INSERT语句完成十万行的插入,而OLE DB没有使用任何INSERT语句,OLE DB比ODBC快了至少一倍以上。再多聊一些OLE DB的历史,当年没能成功替代ODBC,微软宣布准备让它退出底层的原生数据库编程应用,但是有众多厂家反对再加上OLE DB自身的性能优势,非常符合云时代的要求。所以在2017年微软宣布重新支持OLE DB的编程技术并发布了新一代的OLE DB驱动程序。新的驱动加上了加密功能,更能适应于云生时代。
虽然OLE DB性能优越,但繁琐的code让人望而生却,有没有办法?答案是 Active Template Library(ATL),它封装了很多繁琐的OLE DB底层调用,即起到防止内存泄漏,又帮你写出又快又好的程序。
本样本程序使用最新的OLE DB驱动程序,给出一个有INSERT语句的完整演示,完成快速地把OPC数据复制到数据库中,然后再展现出存贮在数据库中的所有数据。
int main(int argc, CHAR* argv[]) {CoInitializeEx(NULL, COINIT_MULTITHREADED);{CDataSource dataSource;CSession session;const WCHAR szUDLFile[] = L"OPCDA.udl";HRESULT hr = dataSource.OpenFromFileName(szUDLFile);if (FAILED(hr)) {printf("OpenFromFileName() failed\n");goto END;}hr = session.Open(dataSource);if (FAILED(hr)){printf("Open() failed\n");dataSource.Close();goto END;}CLSID cidOpcServer;if (FAILED(listServers(cidOpcServer))){printf("listServers() failed\n");dataSource.Close();goto END;}if (FAILED(DA(cidOpcServer, session))) {printf("DA() failed\n");dataSource.Close();goto END;}printf("\nretrieving rows from database...\n\n");displayResult(session);dataSource.Close();}system("pause");END:CoUninitialize();return(EXIT_SUCCESS);
}
这是主程序,运行在多线程状态,这样后面OPC的DataCallBack可以运行在另一个单独的线程中,否则全部都使用一个主线程。
接下来是根据UDL的文件设定来连接数据库。这个UDL的文件如下,
[oledb]
; Everything after this line is an OLE DB initstring
Provider=MSOLEDBSQL19.1;Integrated Security=SSPI;Persist Security Info=False;Initial Catalog=TEST;Data Source=localhost;Use Encryption for Data=Optional;
可以看到,使用了最新的19版本OLE DB的驱动(对应的是msoledbsql19.dll),指定了相应的数据库名和服务器名,不使用用户名和密码作为身份验证手段,同时不要求数据进行加密。有一点要注意的是,当通过UDL界面保存设置时,可能会有很多的属性存在这个UDL文件中,会造成OpenFromFileName()的失败,所以只要留最少的如上属性即可。
OpenFromFileName()是ATL提供的API,帮助获得第一个基于IDataInitialize接口的实例,然后根据UDL连接属性建立和数据库的连接,然后初始化基于IDBInitialize的实例如下,
HRESULT OpenFromFileName(_In_z_ LPCOLESTR szFileName) throw()
{CComPtr<IDataInitialize> spDataInit;CComHeapPtr<OLECHAR> spszInitString;HRESULT hr = CoCreateInstance(__uuidof(MSDAINITIALIZE), NULL, CLSCTX_INPROC_SERVER,__uuidof(IDataInitialize), (void**)&spDataInit);if (FAILED(hr))return hr;hr = spDataInit->LoadStringFromStorage(szFileName, &spszInitString);if (FAILED(hr))return hr;return OpenFromInitializationString(spszInitString);
}
// Open the datasource specified by the passed initialization string
HRESULT OpenFromInitializationString(_In_z_ LPCOLESTR szInitializationString,_In_ bool fPromptForInfo = false) throw()
{CComPtr<IDataInitialize> spDataInit;HRESULT hr = CoCreateInstance(__uuidof(MSDAINITIALIZE), NULL, CLSCTX_INPROC_SERVER,__uuidof(IDataInitialize), (void**)&spDataInit);if (FAILED(hr))return hr;hr = spDataInit->GetDataSource(NULL, CLSCTX_INPROC_SERVER, szInitializationString,__uuidof(IDBInitialize), (IUnknown**)&m_spInit);if (FAILED(hr))return hr;if( fPromptForInfo ){CComPtr<IDBProperties> spIDBProperties;hr = m_spInit->QueryInterface( &spIDBProperties );DBPROP rgProperties[1];DBPROPSET rgPropertySets[1];VariantInit(&rgProperties[0].vValue);rgProperties[0].dwOptions = DBPROPOPTIONS_REQUIRED;rgProperties[0].colid = DB_NULLID;rgProperties[0].dwPropertyID = DBPROP_INIT_PROMPT;rgProperties[0].vValue.vt = VT_I2;rgProperties[0].vValue.lVal = DBPROMPT_COMPLETEREQUIRED;rgPropertySets[0].rgProperties = rgProperties;rgPropertySets[0].cProperties = 1;rgPropertySets[0].guidPropertySet = DBPROPSET_DBINIT;hr = spIDBProperties->SetProperties( 1, rgPropertySets );if (FAILED(hr))return hr;}return m_spInit->Initialize();
}
注意下这里CLSID用的是MSDAINITIALIZE,搜索注册表显示的是
也就是从oledb32.dll的地址空间中先获得IDataInitialize的实例,再调用GetDataSource()来获得IDBInitialize的指针,所以这个m_spInit指针也是在oledb32.dll的地址空间中,只不过它同时也加载了msoledbsql19.dll中的相应接口。
回到主程序,session.Open()也是ATL的API,主要是为了获得IOpenRowset的指针如下,
HRESULT Open(_In_ const CDataSource& ds,_Inout_updates_opt_(ulPropSets) DBPROPSET *pPropSet = NULL,_In_ ULONG ulPropSets = 0) throw()
{CComPtr<IDBCreateSession> spSession;// Check we have connected to the databaseATLASSERT(ds.m_spInit != NULL);HRESULT hr = ds.m_spInit->QueryInterface(__uuidof(IDBCreateSession), (void**)&spSession);if (FAILED(hr))return hr;hr = spSession->CreateSession(NULL, __uuidof(IOpenRowset), (IUnknown**)&m_spOpenRowset);if( pPropSet != NULL && SUCCEEDED(hr) && m_spOpenRowset != NULL ){// If the user didn't specify the default parameter, use oneif (pPropSet != NULL && ulPropSets == 0)ulPropSets = 1;CComPtr<ISessionProperties> spSessionProperties;hr = m_spOpenRowset->QueryInterface(__uuidof(ISessionProperties), (void**)&spSessionProperties);if(FAILED(hr))return hr;hr = spSessionProperties->SetProperties( ulPropSets, pPropSet );}return hr;
}
接下来的主程序是关于OPC的操作,listServers()是为了获得本机上OPC DA的CLSID,如下,
HRESULT listServers(CLSID& cidOpcServer)
{ULONG fetched = 0;HRESULT hr = S_OK;CComHeapPtr<OLECHAR> bsProgID, lpszUserType, lpszVerIndProgID;CATID arrcatid[3] = { NULL };arrcatid[0] = __uuidof(CATID_OPCDAServer10);arrcatid[1] = __uuidof(CATID_OPCDAServer20);arrcatid[2] = __uuidof(CATID_OPCDAServer30);CComPtr<IOPCServerList2> spIOPCServerList2;if (FAILED(hr = spIOPCServerList2.CoCreateInstance(__uuidof(OpcServerList), spIOPCServerList2, CLSCTX_ALL))){printf("CoCreateInstance() for IOPCServerList2 failed\n");return hr;}CComPtr<IOPCEnumGUID> spEnum;hr = spIOPCServerList2->EnumClassesOfCategories(sizeof arrcatid / sizeof CATID, arrcatid, 0, NULL, &spEnum);if (spEnum.p){while ((hr = spEnum->Next(1, &cidOpcServer, &fetched)) == S_OK){hr = spIOPCServerList2->GetClassDetails(cidOpcServer, &bsProgID, &lpszUserType, &lpszVerIndProgID);if (FAILED(hr)) {_tprintf(_T("GetClassDetails() failed\n"));return hr;}break;}}return hr;
}
此段程序也不复杂,获得一个IOPCServerList2的实例,然后对相应的OPC类别进行枚举,再在枚举中循环得到本机的OPC DA的CLSID。
有了DA的CLSID后,开始对DA进行操作,比如创建一个实例,建立一个新组,创建一个回调函数,通知服务端,加入感兴趣的TAG,暂停等待回调函数的结束。具体见下,
HRESULT DA(CLSID& cidOpcServer, CSession& session) {CComPtr<IOPCServer> pIOPCServer;HRESULT hr = pIOPCServer.CoCreateInstance(cidOpcServer, pIOPCServer, CLSCTX_ALL);if (FAILED(hr)) {printf("CoCreateInstance() for IOPCServer failed\n");return E_FAIL;}DWORD dwRevisedUpdateRate = 0;OPCHANDLE hGroup = 0;CComPtr<IOPCItemMgt> pOPCItemMgt;hr = pIOPCServer->AddGroup(L"", TRUE, 1000, NULL, NULL, NULL, LOCALE_SYSTEM_DEFAULT, &hGroup, &dwRevisedUpdateRate, __uuidof(IOPCItemMgt), (LPUNKNOWN*)&pOPCItemMgt);if (FAILED(hr)) {printf("AddGroup() failed\n");return E_FAIL;}DataCallback* pDataCallback = new DataCallback(session);pDataCallback->AddRef();DWORD m_dwCookie;AtlAdvise(pOPCItemMgt, pDataCallback, __uuidof(IOPCDataCallback), &m_dwCookie);hr = addItems(pOPCItemMgt);if (FAILED(hr)) {printf("addItems() failed\n");return E_FAIL;}printf("\npress any key to complete inserting rows to database\n");getchar();AtlUnadvise(pOPCItemMgt, __uuidof(IOPCDataCallback), m_dwCookie);pDataCallback->Release();return S_OK;
}
下面具体看下回调函数,它的作用是当TAG的值有变化时,此函数被唤醒在另一线程执行,返回的参数包括TAG的值,时间戳和状态,如下,
STDMETHODIMP OnDataChange(DWORD dwTransid,OPCHANDLE hGroup,HRESULT hrMasterquality,HRESULT hrMastererror,DWORD dwCount,OPCHANDLE* phClientItems,VARIANT* pvValues,WORD* pwQualities,FILETIME* pftTimeStamps,HRESULT* pErrors
)
{CCommand<CManualAccessor> command;CComVariant vVariant[4];vVariant[0].vt = VT_BSTR;vVariant[1].vt = VT_R4;vVariant[2].vt = VT_DATE;vVariant[3].vt = VT_UINT;hr = command.CreateParameterAccessor(4, vVariant, sizeof vVariant); if (FAILED(hr)) {printf("command.CreateParameterAccessor() failed");return hr;}for (DWORD ii = 0; ii < dwCount; ii++){CComVariant vValue;WORD quality = pwQualities[ii] & OPC_QUALITY_MASK;COleDateTime oleTime = COleDateTime(pftTimeStamps[ii]);SYSTEMTIME st;oleTime.GetAsSystemTime(st);if (phClientItems[ii] == 0)CComBSTR("Random.Int1").CopyTo(&vVariant[0].bstrVal);if (phClientItems[ii] == 1)CComBSTR("Random.Int2").CopyTo(&vVariant[0].bstrVal);else if (phClientItems[ii] == 2)CComBSTR("Random.Real8").CopyTo(&vVariant[0].bstrVal);vVariant[1].fltVal = (FLOAT)pvValues[ii].dblVal;vVariant[2].date = oleTime;vVariant[3].iVal = quality;command.m_nCurrentParameter = 0;command.AddParameterEntry(1, DBTYPE_BSTR, NULL, &vVariant[0].bstrVal);command.AddParameterEntry(2, DBTYPE_R4, NULL, &vVariant[1].fltVal);command.AddParameterEntry(3, DBTYPE_DATE, NULL, &vVariant[2].date);command.AddParameterEntry(4, DBTYPE_UI2, NULL, &vVariant[3].iVal);/*This is not the most efficient and fastest way to insert a row to database due to query building/parsing and commit each time.To bulk insert, interface of IRowsetFastLoad has to be used and it is quite different from this code example.Contact developer to have a code example using IRowsetFastLoad, so you can completely understand the big difference between IDBInitialize and IDataInitialize interfaces when trying to get a pointer to IRowsetFastLoad.*/hr = command.Open(session, "insert into OPCDA (Tag, Value, Time, Quality) Values (?,?,?,?)", NULL, NULL);if (FAILED(hr)) {printf("command.Open() failed");break;}elseprintf("\nOnDataChange: %S (%f, %s.%d, %s)", vVariant[0].bstrVal, vVariant[1].fltVal, oleTime.Format("%F %T").GetString(), st.wMilliseconds, quality == OPC_QUALITY_GOOD ? "good" : "bad");SysFreeString(vVariant[0].bstrVal);}return hr;
}
这段程序中使用了ATL提供的CCommand,然后用CreateParameterAccessor()构建一个关于查询语句参数的存取器。这也是个ATL的函数,不再展开讨论,主要是执行有关参数的BIND,具体可以参见它的源代码。然后根据OPC提供的返回值的数目进行循环,取出每一个TAG的值、时间戳和状态,结合TAG名称来满足INSERT语句四个参数的要求,最后使用ATL的Open()完成INSERT语句的执行。
回到主程序,完成了INSERT的操作,下一步是从数据库中把刚才插入的数据取出来展示,
void displayResult(CSession &session) {CCommand<CManualAccessor> command;const USHORT uColumns = 4;CComVariant vValues[uColumns]{};HRESULT hr = command.CreateAccessor(uColumns, vValues, sizeof vValues);if (FAILED(hr)){printf("CreateAccessor() failed\n");return;}for (ULONG l = 0; l < uColumns; l++){command.AddBindEntry(l + 1, DBTYPE_VARIANT, NULL, &vValues[l], NULL, NULL);}hr = command.Open(session, "select * from OPCDA", NULL, NULL);if (FAILED(hr)){printf("command.Open() failed\n");return;}ULONG count = 0;while (command.MoveNext() == S_OK) {CComVariant* pBind = (CComVariant*)command.m_pBuffer;count++;COleDateTime dateTime(pBind[2].date);printf("%S (%f, %s, %s)\n", pBind[0].bstrVal, pBind[1].fltVal, dateTime.Format("%F %T").GetString(), pBind[3].iVal == OPC_QUALITY_GOOD ? "good" : "bad");;}printf("\nTotal rows: %d\n", count);
}
这段的所有操作都是调用ATL的API,先是CreateAccessor()构建个无参数的存取器,也就是建立一个BIND,供返回的数据存在内存中用。一个Open()语句完成数据的获得,再进行个循环依次展示获得的值。注意一点,返回的是一行的值,有四列。
运行后的结果如下,
综观这一程序,由于有了ATL的加持OLE DB的编程不再那么困难。ATL带来了便利,但也掩盖了对底层OLE DB的理解。每次的INSERT操作都伴随着COMMIT,显然不是最快和最有效率的OLE DB编程方法。也是基于此微软当年(2012年)在新版的Native Client 驱动中引入了IRowsetFastLoad接口,专门进行批量插入。此接口也非常简单只有二个函数,InsertRow()和Commit(),即多次调用InsertRow(),然后一次性地Commit()。为了深入理解更底层的OLE DB编程,我又独自开发了基于IRowsetFastLoad的OPC范例。本以为和这个程序差不多,没想到却被打脸。在开发过程中让我体会到使用IDataInitialize和IDBInitialize实例来获取IRowsetFastLoad指针的巨大不同,对老版的OLE DB驱动sqloledb.dll,老版的Native Client驱动sqlncli11.dll和新版的OLE DB驱动msoledbsql19.dll三者之间的关系有了进一步的了解。在进行完整的BIND过程中也领会到最原始的底层ORM的美(相对于高级语言的ORM,如Hibernate或Entity Framework),这种底层ORM和内存布局直接呼应,没有任何INSERT语句却能快速地完成批量插入,真是“不著一字,尽得风流”。感兴趣的同学可以邮箱联系我获取一份范例,在关键处我都加了注释来加深对OLE DB和COM的编程理解,确保获益满满。
本范例已经在GITHUB开源,下载在此。