00001 #ifndef Impala_Core_Array_ReadRaw_h
00002 #define Impala_Core_Array_ReadRaw_h
00003
00004 #include <vector>
00005 #include "Persistency/File.h"
00006 #include "Util/Database.h"
00007 #include "Core/Array/Element/TypeString.h"
00008 #include "Basis/NativeTypeRead.h"
00009 #include "Core/Array/ArrayType.h"
00010 #include "Core/Array/Arrays.h"
00011 #include "Core/Array/Endian.h"
00012
00013 namespace Impala
00014 {
00015 namespace Core
00016 {
00017 namespace Array
00018 {
00019
00020
00021 inline ArrayType
00022 ReadRawArrayType(Util::IOBuffer* buffer)
00023 {
00024 ILOG_VAR(Impala.Core.Array.ReadRawArrayType);
00025 char buf[200];
00026 buffer->Read(buf, 200);
00027 String line(buf);
00028
00029
00030 String::size_type p = line.find("type:");
00031 if (p == String::npos)
00032 {
00033 ILOG_ERROR("type not found");
00034 return ARRAY2DUNKNOWN;
00035 }
00036 p += 6;
00037 String::size_type p2 = line.find(',', p);
00038 String type = line.substr(p, p2-p);
00039
00040
00041 p = line.find("elemSize:");
00042 if (p == String::npos)
00043 {
00044 ILOG_ERROR("elemSize not found");
00045 return ARRAY2DUNKNOWN;
00046 }
00047 p += 10;
00048 p2 = line.find(',', p);
00049 String elemSizeStr = line.substr(p, p2-p);
00050 Int64 elemSize = atoi(elemSizeStr);
00051
00052
00053 switch (elemSize)
00054 {
00055 case 1:
00056 if (type == "uint8")
00057 return ARRAY2DSCALARUINT8;
00058 if (type == "int32")
00059 return ARRAY2DSCALARINT32;
00060 if (type == "uint64")
00061 return ARRAY2DSCALARUINT64;
00062 if (type == "real32")
00063 return ARRAY2DSCALARREAL32;
00064 if (type == "real64")
00065 return ARRAY2DSCALARREAL64;
00066 break;
00067 case 2:
00068 if (type == "real64")
00069 return ARRAY2DVEC2REAL64;
00070 break;
00071 case 3:
00072 if (type == "uint8")
00073 return ARRAY2DVEC3UINT8;
00074 if (type == "real64")
00075 return ARRAY2DVEC3REAL64;
00076 break;
00077 }
00078 ILOG_ERROR("unknown type " << type << " elemSize " << elemSize);
00079 return ARRAY2DUNKNOWN;
00080 }
00081
00082 #ifndef REPOSITORY_USED // Here comes the deprecated stuff
00083 inline ArrayType
00084 ReadRawArrayType(String fileName, Util::Database* db)
00085 {
00086 Util::IOBuffer* buf = db->GetIOBuffer(fileName, true, false, "", 0, true);
00087 ArrayType res = ReadRawArrayType(buf);
00088 delete buf;
00089 return res;
00090 }
00091 #endif // REPOSITORY_USED
00092
00093 inline ArrayType
00094 ReadRawArrayType(Persistency::File file)
00095 {
00096 Util::IOBuffer* buf = file.GetReadBuffer();
00097 ArrayType res = ReadRawArrayType(buf);
00098 delete buf;
00099 return res;
00100 }
00101
00102 template<class ArrayT>
00103 inline bool
00104 ReadRawHeader(Util::IOBuffer* ioBuf, Int64* version, Int64* binary,
00105 Int64* elemSize, Int64* width, Int64* height, Int64* nrA,
00106 String& compressor,
00107 Int64* linesPerCompressedBlock)
00108 {
00109 typedef typename ArrayT::StorType StorT;
00110
00111 ILOG_VAR(Impala.Core.Array.ReadRawHeader);
00112 char buf[200];
00113 for (int i=0 ; i<200 ; i++)
00114 buf[i] = 0;
00115 ioBuf->Gets(buf, 200);
00116 int res = sscanf(buf, "version: %lld", version);
00117 if (res != 1)
00118 {
00119 ILOG_ERROR("Could not determine version");
00120 return false;
00121 }
00122
00123 *linesPerCompressedBlock = 0;
00124 String typeStr = Element::TypeString<StorT>(0);
00125 if (*version == 1)
00126 {
00127 String formatStr = "version: 1, binary: %lld, type: " + typeStr +
00128 ", elemSize: %lld, width: %lld, height: %lld, nr: %lld";
00129 int res = sscanf(buf, formatStr.c_str(),
00130 binary, elemSize, width, height, nrA);
00131 if ((res != 5) || (*elemSize != ArrayT::ElemSize()))
00132 {
00133 ILOG_ERROR("Header type 1 is not compatible: res==" << res <<
00134 " (should be 5), elemSize==" << elemSize << " (should be "
00135 << ArrayT::ElemSize() << "), buf==[" << buf << "]");
00136 return false;
00137 }
00138 return true;
00139 }
00140 if (*version == 2)
00141 {
00142 String formatStr = "version: 2, binary: %lld, type: " + typeStr +
00143 ", elemSize: %lld, nr: %lld";
00144 int res = sscanf(buf, formatStr.c_str(), binary, elemSize, nrA);
00145 if ((res != 3) || (*elemSize != ArrayT::ElemSize()))
00146 {
00147 ILOG_ERROR("Header type 2 is not compatible: res==" << res <<
00148 " (should be 3), elemSize==" << elemSize << " (should be "
00149 << ArrayT::ElemSize() << "), buf==[" << buf << "]");
00150 return false;
00151 }
00152 return true;
00153 }
00154 if (*version == 3)
00155 {
00156 char compressor2[5];
00157 String formatStr = "version: 3, binary: %lld, type: " + typeStr +
00158 ", elemSize: %lld, width: %lld, height: %lld, nr: %lld, %4s: %lld";
00159 int res = sscanf(buf, formatStr.c_str(),
00160 binary, elemSize, width, height, nrA, compressor2,
00161 linesPerCompressedBlock);
00162 compressor2[4] = '\0';
00163 compressor = String(compressor2);
00164 if ((res != 7) || (*elemSize != ArrayT::ElemSize()))
00165 {
00166 ILOG_ERROR("Header type 3 is not compatible: res==" << res <<
00167 " (should be 6), elemSize==" << elemSize << " (should be "
00168 << ArrayT::ElemSize() << "), buf==[" << buf << "]");
00169 return false;
00170 }
00171 return true;
00172 }
00173 ILOG_ERROR("Unknown version " << *version);
00174 return false;
00175 }
00176
00177 template<class ArrayT>
00178 inline void
00179 ReadRaw(ArrayT*& dst, Util::IOBuffer* buffer)
00180 {
00181 typedef typename ArrayT::StorType StorT;
00182
00183 ILOG_VAR(Impala.Core.Array.ReadRaw);
00184 Int64 version;
00185 Int64 binary;
00186 Int64 elemSize;
00187 Int64 width;
00188 Int64 height;
00189 Int64 nrA;
00190 String compressor;
00191 Int64 linesPerCompressedBlock;
00192 Util::IOBuffer::PositionType startPos = buffer->GetPosition();
00193 if (! ReadRawHeader<ArrayT>(buffer, &version, &binary, &elemSize, &width,
00194 &height, &nrA, compressor,
00195 &linesPerCompressedBlock))
00196 {
00197 return;
00198 }
00199
00200 if (dst == 0)
00201 dst = ArrayCreate<ArrayT>(width, height);
00202 StorT* dstPtr = dst->CPB(0, 0);
00203 if (binary)
00204 {
00205 Int64 lineSize = sizeof(StorT) * width * elemSize;
00206 buffer->SetPosition(startPos + 200);
00207 for (Int64 n=0 ; n<nrA ; n++)
00208 {
00209 if (linesPerCompressedBlock > 0)
00210 {
00211 int y = 0;
00212 while (y < height)
00213 {
00214 int end = y + linesPerCompressedBlock;
00215 if (end > height)
00216 end = height;
00217 int dY = end - y;
00218 dstPtr = dst->CPB(0, y);
00219 size_t nrBytes = lineSize * dY;
00220 size_t nrRead = 0;
00221 if(compressor == "bzip")
00222 {
00223 nrRead = buffer->ReadBzip(dstPtr, nrBytes);
00224 }
00225 else if(compressor == "zlib")
00226 {
00227 nrRead = buffer->ReadZlib(dstPtr, nrBytes);
00228 }
00229 else
00230 {
00231 ILOG_ERROR("Unknown compressor: " << compressor);
00232 }
00233 if (nrRead != nrBytes)
00234 {
00235 ILOG_ERROR("read " << nrRead << " bytes instead of "
00236 << nrBytes);
00237 }
00238 y += dY;
00239 }
00240 }
00241 else
00242 {
00243 size_t nrBytes = lineSize * height;
00244 size_t nrRead = buffer->Read(dstPtr, nrBytes);
00245 if (nrRead != nrBytes)
00246 {
00247 ILOG_ERROR("read " << nrRead << " bytes instead of "
00248 << nrBytes);
00249 }
00250 }
00251 }
00252 Endian(dst, dst);
00253 }
00254 else
00255 {
00256 for (Int64 n=0 ; n<nrA ; n++)
00257 {
00258 for (Int64 i=0 ; i<height ; i++)
00259 {
00260 for (Int64 j=0 ; j<width ; j++)
00261 {
00262 for (Int64 k=0 ; k<elemSize ; k++)
00263 {
00264 buffer->NativeTypeRead(dstPtr);
00265 dstPtr++;
00266 }
00267 }
00268 }
00269 }
00270 buffer->ReadLine();
00271 }
00272 }
00273
00274 #ifndef REPOSITORY_USED // Here comes the deprecated stuff
00275 template<class ArrayT>
00276 inline void
00277 ReadRaw(ArrayT*& dst, String fileName, Util::Database* db)
00278 {
00279
00280 Util::IOBuffer* buf = db->GetIOBuffer(fileName, true, false, "", 0, true);
00281 if (buf)
00282 {
00283 ReadRaw(dst, buf);
00284 delete buf;
00285 }
00286 }
00287 #endif // REPOSITORY_USED
00288
00289 template<class ArrayT>
00290 inline void
00291 ReadRaw(ArrayT*& dst, Persistency::File file)
00292 {
00293 Util::IOBuffer* buf = file.GetReadBuffer();
00294 if (buf)
00295 {
00296 ReadRaw(dst, buf);
00297 delete buf;
00298 }
00299 }
00300
00301 template<class ArrayT>
00302 inline void
00303 ReadRawList(std::vector<ArrayT*>& list, Util::IOBuffer* buffer)
00304 {
00305 typedef typename ArrayT::StorType StorT;
00306 ILOG_VAR(Impala.Core.Array.ReadRawList);
00307
00308 Int64 version;
00309 Int64 binary;
00310 Int64 elemSize;
00311 Int64 width;
00312 Int64 height;
00313 Int64 nrA;
00314 String compressor;
00315 Int64 linesPerCompressedBlock;
00316 Util::IOBuffer::PositionType startPos = buffer->GetPosition();
00317 if (! ReadRawHeader<ArrayT>(buffer, &version, &binary, &elemSize, &width,
00318 &height, &nrA, compressor,
00319 &linesPerCompressedBlock))
00320 {
00321 return;
00322 }
00323
00324 for (Int64 n=0 ; n<nrA ; n++)
00325 if (n+1 > list.size())
00326 list.push_back(ArrayCreate<ArrayT>(width, height));
00327 if (binary)
00328 {
00329 Int64 lineSize = sizeof(StorT) * width * elemSize;
00330 buffer->SetPosition(startPos + 200);
00331 for (Int64 n=0 ; n<nrA ; n++)
00332 {
00333 if (linesPerCompressedBlock > 0)
00334 {
00335 int y = 0;
00336 while (y < height)
00337 {
00338 int end = y + linesPerCompressedBlock;
00339 if (end > height)
00340 end = height;
00341 int dY = end - y;
00342 StorT* dstPtr = list[n]->CPB(0, y);
00343 size_t nrBytes = lineSize * dY;
00344 size_t nrRead = 0;
00345 if(compressor == "bzip")
00346 {
00347 nrRead = buffer->ReadBzip(dstPtr, nrBytes);
00348 }
00349 else if(compressor == "zlib")
00350 {
00351 nrRead = buffer->ReadZlib(dstPtr, nrBytes);
00352 }
00353 else
00354 {
00355 ILOG_ERROR("Unknown compressor: " << compressor);
00356 }
00357 if (nrRead != nrBytes)
00358 {
00359 ILOG_ERROR("read " << nrRead << " bytes instead of "
00360 << nrBytes);
00361 }
00362 y += dY;
00363 }
00364 }
00365 else
00366 {
00367 StorT* dstPtr = list[n]->CPB(0, 0);
00368 size_t nrBytes = lineSize * height;
00369 size_t nrRead = buffer->Read(dstPtr, nrBytes);
00370 if (nrRead != nrBytes)
00371 {
00372 ILOG_ERROR("read " << nrRead << " bytes instead of "
00373 << nrBytes);
00374 }
00375 }
00376 Endian(list[n], list[n]);
00377 }
00378 }
00379 else
00380 {
00381 for (Int64 n=0 ; n<nrA ; n++)
00382 {
00383 for (Int64 i=0 ; i<height ; i++)
00384 {
00385 StorT* dstPtr = list[n]->CPB(0, i);
00386 for (Int64 j=0 ; j<width ; j++)
00387 {
00388 for (Int64 k=0 ; k<elemSize ; k++)
00389 {
00390 buffer->NativeTypeRead(dstPtr);
00391 dstPtr++;
00392 }
00393 }
00394 }
00395 }
00396 }
00397 }
00398
00399 #ifndef REPOSITORY_USED // Here comes the deprecated stuff
00400 template<class ArrayT>
00401 inline void
00402 ReadRawList(std::vector<ArrayT*>& list, String fileName, Util::Database* db)
00403 {
00404
00405 Util::IOBuffer* buf = db->GetIOBuffer(fileName, true, false, "", 0, true);
00406 if (buf)
00407 {
00408 ReadRawList(list, buf);
00409 delete buf;
00410 }
00411 }
00412 #endif // REPOSITORY_USED
00413
00414 template<class ArrayT>
00415 inline void
00416 ReadRawList(std::vector<ArrayT*>& list, Persistency::File file)
00417 {
00418 Util::IOBuffer* buf = file.GetReadBuffer();
00419 if (buf)
00420 {
00421 ReadRawList(list, buf);
00422 delete buf;
00423 }
00424 }
00425
00426 template<class ArrayT>
00427 inline void
00428 ReadRawListVar(std::vector<ArrayT*>& list, Util::IOBuffer* buffer)
00429 {
00430 typedef typename ArrayT::StorType StorT;
00431
00432 ILOG_VAR(Impala.Core.Array.ReadRawListVar);
00433 Int64 ver;
00434 Int64 binary;
00435 Int64 elemSize;
00436 Int64 nrA;
00437 String compressor;
00438 Int64 linesPerCompressedBlock;
00439 Util::IOBuffer::PositionType startPos = buffer->GetPosition();
00440 if (! ReadRawHeader<ArrayT>(buffer, &ver, &binary, &elemSize, 0, 0, &nrA,
00441 compressor,
00442 &linesPerCompressedBlock))
00443 {
00444 return;
00445 }
00446 if (binary)
00447 {
00448 buffer->SetPosition(startPos + 200);
00449 for (Int64 n=0 ; n<nrA ; n++)
00450 {
00451 Int64 width;
00452 Int64 height;
00453 char buf[20];
00454 buffer->Read(buf, 20);
00455 sscanf(buf, "%lld %lld ", &width, &height);
00456 size_t blockSize = width*height*elemSize;
00457 ArrayT* a = ArrayCreate<ArrayT>(width, height);
00458 StorT* dstPtr = a->CPB(0, 0);
00459 size_t nrRead = buffer->Read(dstPtr, sizeof(StorT) * blockSize);
00460 if (nrRead != sizeof(StorT) * blockSize)
00461 {
00462 ILOG_ERROR("read " << nrRead << " bytes instead of "
00463 << sizeof(StorT) * blockSize);
00464 }
00465 Endian(a, a);
00466 list.push_back(a);
00467 }
00468 }
00469 else
00470 {
00471 for (Int64 n=0 ; n<nrA ; n++)
00472 {
00473 Int64 width;
00474 Int64 height;
00475 buffer->NativeTypeRead(&width);
00476 buffer->NativeTypeRead(&height);
00477 Int64 blockSize = width*height*elemSize;
00478 ArrayT* a = ArrayCreate<ArrayT>(width, height);
00479 for(Int64 i=0 ; i<height ; i++)
00480 {
00481 StorT* dstPtr = a->CPB(0, i);
00482 for(Int64 j=0 ; j<width ; j++)
00483 {
00484 for (Int64 k=0 ; k<elemSize ; k++)
00485 {
00486 buffer->NativeTypeRead(dstPtr);
00487 dstPtr++;
00488 }
00489 }
00490 }
00491 list.push_back(a);
00492 }
00493 }
00494 }
00495
00496 #ifndef REPOSITORY_USED // Here comes the deprecated stuff
00497 template<class ArrayT>
00498 inline void
00499 ReadRawListVar(std::vector<ArrayT*>& list, String fileName, Util::Database* db)
00500 {
00501
00502 Util::IOBuffer* buf = db->GetIOBuffer(fileName, true, false, "", 0, true);
00503 if (buf)
00504 {
00505 ReadRawListVar(list, buf);
00506 delete buf;
00507 }
00508 }
00509 #endif // REPOSITORY_USED
00510
00511 template<class ArrayT>
00512 inline void
00513 ReadRawListVar(std::vector<ArrayT*>& list, Persistency::File file)
00514 {
00515 Util::IOBuffer* buf = file.GetReadBuffer();
00516 if (buf)
00517 {
00518 ReadRawListVar(list, buf);
00519 delete buf;
00520 }
00521 }
00522
00523
00524 template<class ArrayT>
00525 inline void
00526 ReadRawListVarIndex(std::vector<Int64>& list, Util::IOBuffer* buffer)
00527 {
00528 typedef typename ArrayT::StorType StorT;
00529
00530 ILOG_VAR(Impala.Core.Array.ReadRawListVarIndex);
00531 String typeStr = Element::TypeString<StorT>(0);
00532 String formatStr = "version: 2, binary: %lld, type: " + typeStr +
00533 ", elemSize: %lld, nr: %lld";
00534
00535 Int64 binary;
00536 Int64 elemSize;
00537 Int64 nrA;
00538 char buf[200];
00539 Util::IOBuffer::PositionType startPos = buffer->GetPosition();
00540 buffer->Gets(buf, 200);
00541 int res = sscanf(buf, formatStr.c_str(), &binary, &elemSize, &nrA);
00542 if ((res != 3) || (elemSize != ArrayT::ElemSize()))
00543 {
00544 ILOG_ERROR("Header is not compatible");
00545 return;
00546 }
00547 if (binary)
00548 {
00549 buffer->SetPosition(startPos + 200);
00550 for (Int64 n=0 ; n<nrA ; n++)
00551 {
00552 Int64 curPos = buffer->GetPosition();
00553 list.push_back(curPos);
00554 Int64 width;
00555 Int64 height;
00556 buffer->Read(buf, 20);
00557 sscanf(buf, "%lld %lld ", &width, &height);
00558 size_t blockSize = width*height*elemSize;
00559 buffer->SetPosition(buffer->GetPosition() + blockSize);
00560 }
00561 }
00562 else
00563 {
00564 ILOG_ERROR("mode not implemented");
00565 }
00566 }
00567
00568 #ifndef REPOSITORY_USED // Here comes the deprecated stuff
00569 template<class ArrayT>
00570 inline void
00571 ReadRawListVarIndex(std::vector<Int64>& list, String fileName, Util::Database* db)
00572 {
00573
00574 Util::IOBuffer* buf = db->GetIOBuffer(fileName, true, false, "", 0, true);
00575 if (buf)
00576 {
00577 ReadRawListVarIndex<ArrayT>(list, buf);
00578 delete buf;
00579 }
00580 }
00581 #endif // REPOSITORY_USED
00582
00583 template<class ArrayT>
00584 inline void
00585 ReadRawListVarIndex(std::vector<Int64>& list, Persistency::File file)
00586 {
00587 Util::IOBuffer* buf = file.GetReadBuffer();
00588 if (buf)
00589 {
00590 ReadRawListVarIndex<ArrayT>(list, buf);
00591 delete buf;
00592 }
00593 }
00594
00595 }
00596 }
00597 }
00598
00599 #endif