Cannot retrieve TCP packets (bytes) correctly

Hi there everyone,

I have a problem with sending and receiving TCP bytes. Clients make a request to the server and the server bundles the request as well as the query results (this is a stream converted into bytes) of the request into a buffer for sending to the client. On the client side, the client receives the response and then splits the received buffer into 2: one buffer of fixed length for the protocol request (this is just a packed record) and the other buffer will contain the stream with the request results.

However, the size of the data received on the client side rarely matches the data sent by the server. I use 
{code}
ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LCollectionBuffer), Length(LBuffer)]));
{code}
to compare what is sent to what is received.

An abridged version of the server side code is shown below. I used a code snippet that Remy used on this web page to concatenate the two buffers into one buffer for sending to the client: [http://stackoverflow.com/questions/7520068/best-way-to-combine-multiple-tbytes-arrays] 

{code}
procedure TfrmServeur.TCPServerExecute(AContext: TIdContext);
var
  // temporary buffer
  LProtocolBuffer, LCollectionBuffer, LBuffer: TBytes;
  // data size in InputBuffer
  LDataSize: Integer;
  // protocol structure
  LProtocol: TProtocol;
  // we need to HARD CAST AContext to TClientContext
  // in order to access our custom methods(procedures)
  LClientContext: TClientContext;
  //
  LStream: TMemoryStream;
begin
  // clear the protocol structure
  InitProtocol(LProtocol);
  // Bu default, we are not updating a record
  bUpdateRecord := False;
  // Hard cast AContext to TClientContext
  LClientContext := TClientContext(AContext);
  //
  if AContext.Connection.IOHandler.InputBufferIsEmpty then
  begin
    {$IFDEF CONN_THREAD}
    // if no data is received within 2 minutes, disconnect the client
    // because the client is offline
    if MinutesBetween(Now, TMyData(LClientContext.Data).LastRecvTime) >= 2 then
    begin
      LClientContext.Connection.Disconnect(True);       // Remove client from listview
      Exit;
    end;
    {$ENDIF}
  end
  else if not AContext.Connection.IOHandler.InputBufferIsEmpty then
  begin
    // store the size of the InputBuffer of the client
    LDataSize := LClientContext.Connection.IOHandler.InputBuffer.Size;
    // in order to prevent spams or to make sure that we have at least
    // the protocol structure sent we check the size of the InputBuffer
    if LDataSize >= szProtocol then
    begin
      //
      LStream := TMemoryStream.Create;
      //
      try
        try
          // Obtient une connexion dans le pool des connexions
          // et prepare les objets pour les requêtes
          dm        := AcquireDM;
          qryRead   := dm.qryRead;
          qryWrite  := dm.qryWrite;
          qryRead.SQL.Clear;
          qryWrite.SQL.Clear;
        except
          // Sort de la loop car une connexion n'a pas été obtenu
          // Reprendre l'execution après << finally >> en bas
          Abort;
        end;

        //
        try
          // read the protocol structure from the client so we can handle
          // the client's request
          LClientContext.Connection.IOHandler.ReadBytes(LBuffer, szProtocol);
          // convert the buffer to protocol structure
          LProtocol := BytesToProtocol(LBuffer);
          // check client command and act accordingly
          case LProtocol.Command of
            {$IFDEF CONN_THREAD}
            cmdKeepAlive:
              begin
                // client is still online so the ping was successful,
                // reset last received time to 'Now'
                TMyData(LClientContext.Data).LastRecvTime := Now;
                // Update listview column - last received time
                with TUpdateUIClientRecvTime.Create do
                begin
                  ConnectionTime := TMyData(LClientContext.Data).ConnectionTime;
                  LastRecvTime := TMyData(LClientContext.Data).LastRecvTime;
                  Notify;
                end;   // with TUpdateUIClientRecvTime.Create do
              end; // cmdKeepAlive: begin
            {$ENDIF}

            //************************************************//
            //
            //      Les listes << Ex >> & les rapports simple
            //
            //************************************************//
            cmdListeVillesEx, cmdListeBeneficiairesEx:
              begin
                //
                with qryRead do
                begin
                  //
                  LGenerics := TGenerics.Create;
                  //
                  LGenerics.Clear;
                  try
                    SQL.Clear;
                    SQL.Text := SQLQueryString(LProtocol);
                    Open;
                    //
                    while not EOF do
                    begin
                      with LGenerics.Add do
                      begin
                        _Integer := Fields.Fields[0].AsInteger;
                        _String:= Fields.Fields[1].AsWideString;
                      end;
                      Next;
                    end;
                    // close the dataset
                    Close;
                    // copy the collection to a memory stream in preparation for sending to clients
                    SaveCollectionToStream(LGenerics, LStream);
                    //
                    // Source: https://groups.google.com/forum/#!topic/borland.public.delphi.objectpascal/aU7E0y3BYdo
                    //
                    LStream.Position := 0;
                    //
                    SetLength(LCollectionBuffer, LStream.Size);
                    // copy the stream into the LCollectionBuffer
                    LStream.Read(LCollectionBuffer[0], LStream.Size);
                    //
                    FreeAndNil(LGenerics);
                  except
                    if Active then Close;
                    if Assigned(LGenerics) then FreeAndNil(LGenerics);
                  end;  // trye
                end;    // with qryRead do
              end;      // cmdListeVillesEx et al
          end; // case LProtocol.Command of
        except
          // Is this the reason why there are memory leaks on line 1440 & 1593
          //
          //// Libere les objets crée auparavant
          //// Libere les << collections >>
          //if Assigned(LGenerics) then FreeAndNil(LGenerics);     // Error if LGenerics does not exist
          //// Libère le stream
          //if Assigned(LStream) then FreeAndNil(LStream);
        end;
      finally
        // Convertir le protocol en bytes
        LProtocolBuffer := ProtocolToBytes(LProtocol);
        // merge the buffers into one buffer
        // Source: http://stackoverflow.com/questions/7520068/best-way-to-combine-multiple-tbytes-arrays
        SetLength(LBuffer, Length(LProtocolBuffer) + Length(LCollectionBuffer));
        if LProtocolBuffer <> nil then Move(LProtocolBuffer[0], LBuffer[0], Length(LProtocolBuffer));
        if LCollectionBuffer <> nil then Move(LCollectionBuffer[0], LBuffer[Length(LProtocolBuffer)], Length(LCollectionBuffer));

        //ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LCollectionBuffer), Length(LBuffer)]));

        // send the merged buffer to the client
        LClientContext.Connection.IOHandler.Write(LBuffer);
        // clear all the buffers
        ClearBuffer(LProtocolBuffer);
        ClearBuffer(LCollectionBuffer);
        ClearBuffer(LBuffer);
        // Libere LStream
        FreeAndNil(LStream);
        //
        Initialize(LProtocol);
        // Libère les composants de données
        qryRead := nil;
        qryWrite := nil;
        qryRead.Free;
        qryWrite.Free;
        // Remettre la connection dans le pool des connections
        ReleaseDM(dm);
        // Reinitialise les données du << LastRecvTime >>
        TMyData(LClientContext.Data).LastRecvTime := Now;
        // Update listview column - last received time
        with TUpdateUIClientRecvTime.Create do
        begin
          ConnectionTime := TMyData(LClientContext.Data).ConnectionTime;
          LastRecvTime := TMyData(LClientContext.Data).LastRecvTime;
          Notify;
        end;   // with TUpdateUIClientRecvTime.Create do
      end;     // tryf
      //
    end;  // if LDataSize >= szProtocol then
  end;    // if AContext.Connection.IOHandler.InputBufferIsEmpty then

  // Petit pause pour empecher trop de consumation de CPU par le serveur
  IndySleep(10);
end;
{code}

An abridged version of the client side receiving thread is shown below:
{code}
//
// The EXECUTE method for routing server replies
//
// OnError just cancel ALL operations & initialize the form
//
procedure TDataThread.Execute;

      function SliceByteArray(const B: array of Byte; Start, Len: Integer):
        TBytes;
      begin
        if Start < 0 then
          Start := 0;
        if Len < 0 then
          Len := 0
        else if Start >= Length(B) then
          Len := 0
        else if Start + Len > Length(B) then
          Len := Length(B) - Start;
        SetLength(Result, Len);
        if Len > 0 then
          Move(B[Start], Result[0], Len);
      end;

var
  LBuffer, LDataBuffer, LProtocolBuffer: TBytes;
  LProtocol: TProtocol;
  LDataSize: int64;
begin
  //
  FreeOnTerminate := not (IsConnectionThread = True);
  //
  //
  while NOT Terminated and ThreadTCPClient.Connected
    and (LoopCount > 0) and (ThreadError = EmptyStr) do
  begin
    //
    if IsConnectionThread then
    begin
      // Ping the server every minute
      // ONLY the connection thread is able to do this
      if (MinutesBetween(Now, FLastPingTime) >= 1) then
      begin
        //
        Lock;
        // initialize the protocol
        InitProtocol(LProtocol);
        // set the command to cmdKeepAlive
        LProtocol.Command := cmdKeepAlive;
        // Convertir le protocol en bytes
        LBuffer := ProtocolToBytes(LProtocol);
        //
        try
          try
            // Envoyez la requête en bytes au serveur
            ThreadTCPClient.IOHandler.Write(LBuffer);
            // reset the last ping time to Now
            LastPingTime := Now;
            // The synchonize call below can be used as a visual indicator
            // for users to know if they are still online
            //Synchronize(@Self.DoConnectionAlive);
          except
            // Exception handling code here !
            On E: EIdException do begin
              raise EAbort.Create(E.Message);
            end;
          end;
        finally
          //
          Unlock;
          // clear buffer
          ClearBuffer(LBuffer);
        end;
      end;  //  if (MinutesBetween(Now, FLastPingTime) >= 1) then
    end
    else if not IsConnectionThread then
    begin
      if not ThreadTCPClient.IOHandler.InputBufferIsEmpty and
        (ThreadTCPClient.IOHandler.InputBuffer.Size >= szProtocol) then
      begin
        //
        Lock;
        //
        try
          //
          try
            // Initialize the protocol
            InitProtocol(LProtocol);
            //
            //
            // then read from InputBuffer the size of the protocol structure
            ThreadTCPClient.IOHandler.InputBuffer.ExtractToBytes(LBuffer);
            // move a block of memory from the beginning of LBuffer to the location of the size of the protocol structure
            // LProtocolBuffer
            SetLength(LProtocolBuffer, szProtocol);
            LProtocolBuffer := SliceByteArray(LBuffer, 0, szProtocol);
            // convert array of bytes to protocol
            LProtocol := BytesToProtocol(LProtocolBuffer);
            // move a block of memory from the location after the size of the protocol structure
            // to the end of LBuffer to LCollectionBuffer
            SetLength(LCollectionBuffer, Length(LBuffer) - szProtocol);
            LDataBuffer := SliceByteArray(LBuffer, szProtocol, Length(LBuffer) - szProtocol);

            // The following line is just for debugging. Remove it when all is well
			ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LDataBuffer), Length(LBuffer)]));
            //
            LoopCount := LoopCount - 1;
            //
            case LProtocol.Command of
              cmdListeVillesEx:
                begin
                  //
                  ControlName := 'Villes';
                  // Transferez le flux au thread principale
                  Synchronize(@DoListes);
                end;
              cmdListeBeneficiairesEx:
                begin
                  //
                  ControlName := 'Bénéficiaires';
                  // Transferez le flux au thread principale
                  Synchronize(@DoListes);
                end;
            end;  // case LProtocol.Command of
          except
            On E: EIdSocketError do begin
              //ThreadError := strProblemeDeConnection;
              raise EAbort.Create(strProblemeDeConnection)
            end;
            On E: EIdConnClosedGracefully do
            begin
              //ThreadError := strConnectionFermeParServeur;
              raise EAbort.Create(strProblemeDeConnection)
            end;
            //
            On E: Exception do begin
              raise EAbort.Create(E.Message)
            end;
          end;  // try...except
        finally
          //
          Unlock;
          // clear buffer
          ClearBuffer(LBuffer);
          ClearBuffer(LDataBuffer);
          ClearBuffer(LProtocolBuffer);
          //
          ControlName := '';
        end;  // try...finally
      end;    // if not ThreadTCPClient.IOHandler.InputBufferIsEmpty then
    end;
    //
    Sleep(10);
  end;      // while NOT Terminated and ThreadTCPClient.Connected do
end;
{code}

Thanks for your kind assistance.

Jay Dee
0
Jay
7/24/2015 10:26:41 AM
embarcadero.delphi.winsock 1874 articles. 2 followers. Follow

8 Replies
1674 Views

Similar Articles

[PageSpeed] 13

Jay wrote:

> However, the size of the data received on the client side rarely
> matches the data sent by the server.

That is because you have logic errors in your code on both sides.  You are 
not handling the reading/writing of your protocol packets correctly, especially 
in regard to the fact that TCP is a streaming protocol, so there is guarantee 
that you would have a full protocol packet in the InputBuffer at any given 
time.  Particularly on the client side, because your client has no idea how 
many bytes the server is actually sending, especially for the collection 
data.

Try something more like this:

{code}
procedure TfrmServeur.TCPServerConnect(AContext: TIdContext);
begin
  {$IFDEF CONN_THREAD}
  AContext.Connection.IOHandler.ReadTimeout := 120000;
  {$ENDIF}
end;

procedure TfrmServeur.TCPServerExecute(AContext: TIdContext);
type
  PBytes = ^TBytes;
var
  // temporary buffer
  LProtocolBuffer, LCollectionBuffer, LBuffer: TIdBytes;
  // protocol structure
  LProtocol: TProtocol;
  //
  LStream: TMemoryStream;
  dm: TMyDataModule; // replace this with your actual DM class type
begin
  // clear the protocol structure
  InitProtocol(LProtocol);
  // Bu default, we are not updating a record
  bUpdateRecord := False;
  //
  // read the protocol structure from the client so we can handle
  // the client's request
  AContext.Connection.IOHandler.ReadBytes(LBuffer, szProtocol);
  // convert the buffer to protocol structure
  LProtocol := BytesToProtocol(PBytes(@LBuffer)^);

  // check client command and act accordingly
  case LProtocol.Command of
    {$IFDEF CONN_THREAD}
    cmdKeepAlive:
    begin
      // client is still online so the ping was successful
    end; // cmdKeepAlive: begin
    {$ENDIF}
    //************************************************//
    //
    //      Les listes << Ex >> & les rapports simple
    //
    //************************************************//
    cmdListeVillesEx, cmdListeBeneficiairesEx:
    begin
      //
      try
        // Obtient une connexion dans le pool des connexions
        // et prepare les objets pour les requêtes
        dm := AcquireDM;
        try
          dm.qryRead.SQL.Clear;
          dm.qryWrite.SQL.Clear;
        except
          ReleaseDM(dm);
          raise;
        end;
      except
        // Sort de la loop car une connexion n'a pas été obtenu
        // Reprendre l'execution après << finally >> en bas
        Abort;
      end;
      //
      try
        with dm.qryRead do
        begin
          //
          LGenerics := TGenerics.Create;
          try
            //
            LGenerics.Clear;
            SQL.Text := SQLQueryString(LProtocol);
            Open;
            //
            try
              while not EOF do
              begin
                with LGenerics.Add do
                begin
                  _Integer := Fields.Fields[0].AsInteger;
                  _String:= Fields.Fields[1].AsWideString;
                end;
                Next;
              end;
            finally
              // close the dataset
              Close;
            end; // tryf
            // copy the collection to a memory stream in preparation for 
sending to clients
            LStream := TMemoryStream.Create;
            try
              SaveCollectionToStream(LGenerics, LStream);
              //
              // Source: https://groups.google.com/forum/#!topic/borland.public.delphi.objectpascal/aU7E0y3BYdo
              //
              LStream.Position := 0;
              // copy the stream into the LCollectionBuffer
              ReadTIdBytesFromStream(LStream, LCollectionBuffer, LStream.Size);
            finally
              // Libere LStream
              LStream.Free;
            end;
          finally
            LGenerics.Free;
          end; // tryf
        end;    // with dm.qryRead do
      finally
        // Remettre la connection dans le pool des connections
        ReleaseDM(dm);
      end; // tryf
    end; // cmdListeVillesEx et al
  end; // case LProtocol.Command of

  // Convertir le protocol en bytes
  PBytes(@LProtocolBuffer)^ := ProtocolToBytes(LProtocol);
  //MessageBox(0, PChar(Format('Protocol buffer length %d; Collection buffer 
length %d', [Length(LProtocolBuffer), Length(LCollectionBuffer)])), '', MB_OK);

  // send the buffers to the client
  LClientContext.Connection.IOHandler.Write(Length(LProtocolBuffer)+Length(LCollectionBuffer));
  LClientContext.Connection.IOHandler.Write(LProtocolBuffer);
  LClientContext.Connection.IOHandler.Write(LCollectionBuffer);

  // Reinitialise les données du << LastRecvTime >>
  TMyData(AContext.Data).LastRecvTime := Now;
  // Update listview column - last received time
  with TUpdateUIClientRecvTime.Create do
  begin
    ConnectionTime := TMyData(AContext.Data).ConnectionTime;
    LastRecvTime := TMyData(AContext.Data).LastRecvTime;
    Notify;
  end;   // with TUpdateUIClientRecvTime.Create do
end;
{code}

{code}
//
// The EXECUTE method for routing server replies
//
// OnError just cancel ALL operations & initialize the form
//
procedure TDataThread.Execute;
type
  PBytes = ^TBytes;
var
  LBuffer, LDataBuffer, LProtocolBuffer: TIdBytes;
  LProtocol: TProtocol;
  LDataSize: Integer;
begin
  //
  FreeOnTerminate := not (IsConnectionThread = True);
  //
  //
  while (NOT Terminated) and ThreadTCPClient.Connected and (LoopCount > 0) 
and (ThreadError = EmptyStr) do
  begin
    //
    if IsConnectionThread then
    begin
      // Ping the server every minute
      // ONLY the connection thread is able to do this
      if (MinutesBetween(Now, FLastPingTime) >= 1) then
      begin
        //
        Lock;
        try
          // initialize the protocol
          InitProtocol(LProtocol);
          // set the command to cmdKeepAlive
          LProtocol.Command := cmdKeepAlive;
          // Convertir le protocol en bytes
          PBytes(@LBuffer)^ := ProtocolToBytes(LProtocol);
          //
          try
            // Envoyez la requête en bytes au serveur
            ThreadTCPClient.IOHandler.Write(LBuffer);
            // reset the last ping time to Now
            LastPingTime := Now;
            // The synchonize call below can be used as a visual indicator
            // for users to know if they are still online
            //Synchronize(@Self.DoConnectionAlive);
          except
            // Exception handling code here !
            On E: EIdException do begin
              raise EAbort.Create(E.Message);
            end;
          end;
        finally
          //
          Unlock;
          //
          SetLength(LBuffer, 0);
        end;
      end;  //  if (MinutesBetween(Now, FLastPingTime) >= 1) then
    end;

    if not ThreadTCPClient.IOHandler.InputBufferIsEmpty then
    begin
      //
      Lock;
      try
        //
        try
          // Initialize the protocol
          InitProtocol(LProtocol);
          //
          // then read from InputBuffer the size of the protocol structure
          LDataSize := ThreadTCPClient.IOHandler.InputBuffer.ReadLongint;
          ThreadTCPClient.IOHandler.InputBuffer.ReadBytes(LBuffer, LDataSize);
          //
          // move a block of memory from the beginning of LBuffer to the 
location of the size of the protocol structure
          // LProtocolBuffer
          LProtocolBuffer := Copy(LBuffer, 0, szProtocol);
          // move a block of memory from the location after the size of the 
protocol structure
          // to the end of LBuffer to LCollectionBuffer
          LDataBuffer := Copy(LBuffer, szProtocol, LDataSize - szProtocol);
          // The following line is just for debugging. Remove it when all 
is well
          MessageBox(0, PChar(Format('Protocol buffer length %d; Collection 
buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LDataBuffer), 
Length(LBuffer)]))), '', MB_OK);
          //
          Dec(LoopCount);
          //
          // convert array of bytes to protocol
          LProtocol := BytesToProtocol(PBytes(@LProtocolBuffer)^);
          case LProtocol.Command of
            cmdListeVillesEx:
            begin
              //
              ControlName := 'Villes';
              // Transferez le flux au thread principale
              Synchronize(@DoListes);
            end;
            cmdListeBeneficiairesEx:
            begin
              //
              ControlName := 'Bénéficiaires';
              // Transferez le flux au thread principale
              Synchronize(@DoListes);
            end;
          end;  // case LProtocol.Command of
        except
          On E: EIdSocketError do begin
            //ThreadError := strProblemeDeConnection;
            raise EAbort.Create(strProblemeDeConnection)
          end;
          On E: EIdConnClosedGracefully do
          begin
            //ThreadError := strConnectionFermeParServeur;
            raise EAbort.Create(strProblemeDeConnection)
          end;
          //
          On E: Exception do begin
            raise EAbort.Create(E.Message)
          end;
        end;  // try...except
      finally
        //
        Unlock;
        //
        SetLength(LBuffer, 0);
        SetLength(LProtocolBuffer, 0);
        SetLength(LDataBuffer, 0);
        //
        ControlName := '';
      end;  // try...finally
    end;    // if not ThreadTCPClient.IOHandler.InputBufferIsEmpty then
  end;      // while NOT Terminated and ThreadTCPClient.Connected do
end;
{code}

-- 
Remy Lebeau (TeamB)
0
Remy
7/24/2015 6:50:06 PM
Thanks a lot Remy for your aid & the clarity of your answers. You suggestion worked. Indy has sooo many functions & procedures. I did not know about ReadTIdBytesFromStream & its counterpart WriteTIdBytesToStream that I now use on the client side.

I would like your input concerning some code I use to move collections to and from streams. Sometimes they work & sometimes they fail. I haven't been able to figure out why?

Here it is

{code}
//
// Persistent collection methods
// Source: http://users.atw.hu/delphicikk/listaz.php?id=620&oldal=36
procedure LoadCollectionFromStream(Stream: TStream; Collection: TCollection);
begin
  //
  with TReader.Create(Stream, 4096) do
  try
    CheckValue(vaCollection);
    ReadCollection(Collection);
  finally
    Free;
  end;
end;

procedure LoadCollectionFromFile(const FileName: string; Collection: TCollection);
var
  FS: TFileStream;
begin
  FS := TFileStream.Create(FileName, fmOpenRead or fmShareDenyWrite);
  try
    LoadCollectionFromStream(FS, Collection);
  finally
    FS.Free;
  end;
end;

procedure SaveCollectionToStream(Collection: TCollection; Stream: TStream);
begin
  with TWriter.Create(Stream, 4096) do
  try
    WriteCollection(Collection);
  finally
    Free;
  end;
end;

procedure SaveCollectionToFile(Collection: TCollection; const FileName: string);
var
  FS: TFileStream;
begin
  FS := TFileStream.Create(FileName, fmCreate or fmShareDenyWrite);
  try
    SaveCollectionToStream(Collection, FS);
  finally
    FS.Free;
  end;
end;
{code}

When the client side thread terminates, the main thread takes over & the procedure below loads each stream into an array of generic collection (FGenerics: array of TGenerics) which was previously created in the mainform's Create method. The generic collection is destroyed in the mainform's Destroy method

{code}
  { TGeneric }
  TGeneric = class(TBaseItemClass)
  private
    { private declarations }
    FInteger: integer;
    FString: UTF8string;
    function GetPropertyCount: integer; //override;
    function GetPropertyString(Aindex: integer): UTF8string; //override;
  public
    { public declarations }
  published
    { published declarations }
    property _Integer: integer read FInteger write FInteger;
    property _String: UTF8string read FString write FString;
  end;


procedure TfrmChangementSituation.Listes(AStream: TMemoryStream; AName: string);
begin
  //
  try
    //ShowMessage('Stream size is ' + IntToStr(AStream.Size));
    //
    if AStream.Size > 0 then
    begin
      case AName of
        'Mediateur'                : LoadCollectionFromStream(AStream, FGenerics[0]);
        'Bénéficiaires'            : LoadCollectionFromStream(AStream, FGenerics[1]);
        'ResidentFoyer'            : LoadCollectionFromStream(AStream, FGenerics[2]);
        'Patients'                 : LoadCollectionFromStream(AStream, FGenerics[3]);
        'TypeEvenement'            : LoadCollectionFromStream(AStream, FGenerics[4]);
      end;      // case AName of
      //
      Dec(FControlCount);
      //
      frmMain.UpdateProgressBar(1);
      // Fill the GUI controls using the collections
      // if the thread has been terminated
      if FControlCount = 0 then
        FillControls;
    end;  // if AStream.Size > 0 then
  except
    On E: Exception do
      ShowMessage(E.Message);
  end;
end;
{code}

This is where the show stops! Sometimes I get read errors inside of LoadCollectionFromStream, the progressbar freezes & the form remains empty! Is there a better way of doing this? Thanks a lot.

Jay Dee



> {quote:title=Jay Dee wrote:}{quote}
> Hi there everyone,
> 
> I have a problem with sending and receiving TCP bytes. Clients make a request to the server and the server bundles the request as well as the query results (this is a stream converted into bytes) of the request into a buffer for sending to the client. On the client side, the client receives the response and then splits the received buffer into 2: one buffer of fixed length for the protocol request (this is just a packed record) and the other buffer will contain the stream with the request results.
> 
> However, the size of the data received on the client side rarely matches the data sent by the server. I use 
> {code}
> ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LCollectionBuffer), Length(LBuffer)]));
> {code}
> to compare what is sent to what is received.
> 
> An abridged version of the server side code is shown below. I used a code snippet that Remy used on this web page to concatenate the two buffers into one buffer for sending to the client: [http://stackoverflow.com/questions/7520068/best-way-to-combine-multiple-tbytes-arrays] 
> 
> {code}
> procedure TfrmServeur.TCPServerExecute(AContext: TIdContext);
> var
>   // temporary buffer
>   LProtocolBuffer, LCollectionBuffer, LBuffer: TBytes;
>   // data size in InputBuffer
>   LDataSize: Integer;
>   // protocol structure
>   LProtocol: TProtocol;
>   // we need to HARD CAST AContext to TClientContext
>   // in order to access our custom methods(procedures)
>   LClientContext: TClientContext;
>   //
>   LStream: TMemoryStream;
> begin
>   // clear the protocol structure
>   InitProtocol(LProtocol);
>   // Bu default, we are not updating a record
>   bUpdateRecord := False;
>   // Hard cast AContext to TClientContext
>   LClientContext := TClientContext(AContext);
>   //
>   if AContext.Connection.IOHandler.InputBufferIsEmpty then
>   begin
>     {$IFDEF CONN_THREAD}
>     // if no data is received within 2 minutes, disconnect the client
>     // because the client is offline
>     if MinutesBetween(Now, TMyData(LClientContext.Data).LastRecvTime) >= 2 then
>     begin
>       LClientContext.Connection.Disconnect(True);       // Remove client from listview
>       Exit;
>     end;
>     {$ENDIF}
>   end
>   else if not AContext.Connection.IOHandler.InputBufferIsEmpty then
>   begin
>     // store the size of the InputBuffer of the client
>     LDataSize := LClientContext.Connection.IOHandler.InputBuffer.Size;
>     // in order to prevent spams or to make sure that we have at least
>     // the protocol structure sent we check the size of the InputBuffer
>     if LDataSize >= szProtocol then
>     begin
>       //
>       LStream := TMemoryStream.Create;
>       //
>       try
>         try
>           // Obtient une connexion dans le pool des connexions
>           // et prepare les objets pour les requêtes
>           dm        := AcquireDM;
>           qryRead   := dm.qryRead;
>           qryWrite  := dm.qryWrite;
>           qryRead.SQL.Clear;
>           qryWrite.SQL.Clear;
>         except
>           // Sort de la loop car une connexion n'a pas été obtenu
>           // Reprendre l'execution après << finally >> en bas
>           Abort;
>         end;
> 
>         //
>         try
>           // read the protocol structure from the client so we can handle
>           // the client's request
>           LClientContext.Connection.IOHandler.ReadBytes(LBuffer, szProtocol);
>           // convert the buffer to protocol structure
>           LProtocol := BytesToProtocol(LBuffer);
>           // check client command and act accordingly
>           case LProtocol.Command of
>             {$IFDEF CONN_THREAD}
>             cmdKeepAlive:
>               begin
>                 // client is still online so the ping was successful,
>                 // reset last received time to 'Now'
>                 TMyData(LClientContext.Data).LastRecvTime := Now;
>                 // Update listview column - last received time
>                 with TUpdateUIClientRecvTime.Create do
>                 begin
>                   ConnectionTime := TMyData(LClientContext.Data).ConnectionTime;
>                   LastRecvTime := TMyData(LClientContext.Data).LastRecvTime;
>                   Notify;
>                 end;   // with TUpdateUIClientRecvTime.Create do
>               end; // cmdKeepAlive: begin
>             {$ENDIF}
> 
>             //************************************************//
>             //
>             //      Les listes << Ex >> & les rapports simple
>             //
>             //************************************************//
>             cmdListeVillesEx, cmdListeBeneficiairesEx:
>               begin
>                 //
>                 with qryRead do
>                 begin
>                   //
>                   LGenerics := TGenerics.Create;
>                   //
>                   LGenerics.Clear;
>                   try
>                     SQL.Clear;
>                     SQL.Text := SQLQueryString(LProtocol);
>                     Open;
>                     //
>                     while not EOF do
>                     begin
>                       with LGenerics.Add do
>                       begin
>                         _Integer := Fields.Fields[0].AsInteger;
>                         _String:= Fields.Fields[1].AsWideString;
>                       end;
>                       Next;
>                     end;
>                     // close the dataset
>                     Close;
>                     // copy the collection to a memory stream in preparation for sending to clients
>                     SaveCollectionToStream(LGenerics, LStream);
>                     //
>                     // Source: https://groups.google.com/forum/#!topic/borland.public.delphi.objectpascal/aU7E0y3BYdo
>                     //
>                     LStream.Position := 0;
>                     //
>                     SetLength(LCollectionBuffer, LStream.Size);
>                     // copy the stream into the LCollectionBuffer
>                     LStream.Read(LCollectionBuffer[0], LStream.Size);
>                     //
>                     FreeAndNil(LGenerics);
>                   except
>                     if Active then Close;
>                     if Assigned(LGenerics) then FreeAndNil(LGenerics);
>                   end;  // trye
>                 end;    // with qryRead do
>               end;      // cmdListeVillesEx et al
>           end; // case LProtocol.Command of
>         except
>           // Is this the reason why there are memory leaks on line 1440 & 1593
>           //
>           //// Libere les objets crée auparavant
>           //// Libere les << collections >>
>           //if Assigned(LGenerics) then FreeAndNil(LGenerics);     // Error if LGenerics does not exist
>           //// Libère le stream
>           //if Assigned(LStream) then FreeAndNil(LStream);
>         end;
>       finally
>         // Convertir le protocol en bytes
>         LProtocolBuffer := ProtocolToBytes(LProtocol);
>         // merge the buffers into one buffer
>         // Source: http://stackoverflow.com/questions/7520068/best-way-to-combine-multiple-tbytes-arrays
>         SetLength(LBuffer, Length(LProtocolBuffer) + Length(LCollectionBuffer));
>         if LProtocolBuffer <> nil then Move(LProtocolBuffer[0], LBuffer[0], Length(LProtocolBuffer));
>         if LCollectionBuffer <> nil then Move(LCollectionBuffer[0], LBuffer[Length(LProtocolBuffer)], Length(LCollectionBuffer));
> 
>         //ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LCollectionBuffer), Length(LBuffer)]));
> 
>         // send the merged buffer to the client
>         LClientContext.Connection.IOHandler.Write(LBuffer);
>         // clear all the buffers
>         ClearBuffer(LProtocolBuffer);
>         ClearBuffer(LCollectionBuffer);
>         ClearBuffer(LBuffer);
>         // Libere LStream
>         FreeAndNil(LStream);
>         //
>         Initialize(LProtocol);
>         // Libère les composants de données
>         qryRead := nil;
>         qryWrite := nil;
>         qryRead.Free;
>         qryWrite.Free;
>         // Remettre la connection dans le pool des connections
>         ReleaseDM(dm);
>         // Reinitialise les données du << LastRecvTime >>
>         TMyData(LClientContext.Data).LastRecvTime := Now;
>         // Update listview column - last received time
>         with TUpdateUIClientRecvTime.Create do
>         begin
>           ConnectionTime := TMyData(LClientContext.Data).ConnectionTime;
>           LastRecvTime := TMyData(LClientContext.Data).LastRecvTime;
>           Notify;
>         end;   // with TUpdateUIClientRecvTime.Create do
>       end;     // tryf
>       //
>     end;  // if LDataSize >= szProtocol then
>   end;    // if AContext.Connection.IOHandler.InputBufferIsEmpty then
> 
>   // Petit pause pour empecher trop de consumation de CPU par le serveur
>   IndySleep(10);
> end;
> {code}
> 
> An abridged version of the client side receiving thread is shown below:
> {code}
> //
> // The EXECUTE method for routing server replies
> //
> // OnError just cancel ALL operations & initialize the form
> //
> procedure TDataThread.Execute;
> 
>       function SliceByteArray(const B: array of Byte; Start, Len: Integer):
>         TBytes;
>       begin
>         if Start < 0 then
>           Start := 0;
>         if Len < 0 then
>           Len := 0
>         else if Start >= Length(B) then
>           Len := 0
>         else if Start + Len > Length(B) then
>           Len := Length(B) - Start;
>         SetLength(Result, Len);
>         if Len > 0 then
>           Move(B[Start], Result[0], Len);
>       end;
> 
> var
>   LBuffer, LDataBuffer, LProtocolBuffer: TBytes;
>   LProtocol: TProtocol;
>   LDataSize: int64;
> begin
>   //
>   FreeOnTerminate := not (IsConnectionThread = True);
>   //
>   //
>   while NOT Terminated and ThreadTCPClient.Connected
>     and (LoopCount > 0) and (ThreadError = EmptyStr) do
>   begin
>     //
>     if IsConnectionThread then
>     begin
>       // Ping the server every minute
>       // ONLY the connection thread is able to do this
>       if (MinutesBetween(Now, FLastPingTime) >= 1) then
>       begin
>         //
>         Lock;
>         // initialize the protocol
>         InitProtocol(LProtocol);
>         // set the command to cmdKeepAlive
>         LProtocol.Command := cmdKeepAlive;
>         // Convertir le protocol en bytes
>         LBuffer := ProtocolToBytes(LProtocol);
>         //
>         try
>           try
>             // Envoyez la requête en bytes au serveur
>             ThreadTCPClient.IOHandler.Write(LBuffer);
>             // reset the last ping time to Now
>             LastPingTime := Now;
>             // The synchonize call below can be used as a visual indicator
>             // for users to know if they are still online
>             //Synchronize(@Self.DoConnectionAlive);
>           except
>             // Exception handling code here !
>             On E: EIdException do begin
>               raise EAbort.Create(E.Message);
>             end;
>           end;
>         finally
>           //
>           Unlock;
>           // clear buffer
>           ClearBuffer(LBuffer);
>         end;
>       end;  //  if (MinutesBetween(Now, FLastPingTime) >= 1) then
>     end
>     else if not IsConnectionThread then
>     begin
>       if not ThreadTCPClient.IOHandler.InputBufferIsEmpty and
>         (ThreadTCPClient.IOHandler.InputBuffer.Size >= szProtocol) then
>       begin
>         //
>         Lock;
>         //
>         try
>           //
>           try
>             // Initialize the protocol
>             InitProtocol(LProtocol);
>             //
>             //
>             // then read from InputBuffer the size of the protocol structure
>             ThreadTCPClient.IOHandler.InputBuffer.ExtractToBytes(LBuffer);
>             // move a block of memory from the beginning of LBuffer to the location of the size of the protocol structure
>             // LProtocolBuffer
>             SetLength(LProtocolBuffer, szProtocol);
>             LProtocolBuffer := SliceByteArray(LBuffer, 0, szProtocol);
>             // convert array of bytes to protocol
>             LProtocol := BytesToProtocol(LProtocolBuffer);
>             // move a block of memory from the location after the size of the protocol structure
>             // to the end of LBuffer to LCollectionBuffer
>             SetLength(LCollectionBuffer, Length(LBuffer) - szProtocol);
>             LDataBuffer := SliceByteArray(LBuffer, szProtocol, Length(LBuffer) - szProtocol);
> 
>             // The following line is just for debugging. Remove it when all is well
> 			ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LDataBuffer), Length(LBuffer)]));
>             //
>             LoopCount := LoopCount - 1;
>             //
>             case LProtocol.Command of
>               cmdListeVillesEx:
>                 begin
>                   //
>                   ControlName := 'Villes';
>                   // Transferez le flux au thread principale
>                   Synchronize(@DoListes);
>                 end;
>               cmdListeBeneficiairesEx:
>                 begin
>                   //
>                   ControlName := 'Bénéficiaires';
>                   // Transferez le flux au thread principale
>                   Synchronize(@DoListes);
>                 end;
>             end;  // case LProtocol.Command of
>           except
>             On E: EIdSocketError do begin
>               //ThreadError := strProblemeDeConnection;
>               raise EAbort.Create(strProblemeDeConnection)
>             end;
>             On E: EIdConnClosedGracefully do
>             begin
>               //ThreadError := strConnectionFermeParServeur;
>               raise EAbort.Create(strProblemeDeConnection)
>             end;
>             //
>             On E: Exception do begin
>               raise EAbort.Create(E.Message)
>             end;
>           end;  // try...except
>         finally
>           //
>           Unlock;
>           // clear buffer
>           ClearBuffer(LBuffer);
>           ClearBuffer(LDataBuffer);
>           ClearBuffer(LProtocolBuffer);
>           //
>           ControlName := '';
>         end;  // try...finally
>       end;    // if not ThreadTCPClient.IOHandler.InputBufferIsEmpty then
>     end;
>     //
>     Sleep(10);
>   end;      // while NOT Terminated and ThreadTCPClient.Connected do
> end;
> {code}
> 
> Thanks for your kind assistance.
> 
> Jay Dee
0
Jay
7/25/2015 3:21:59 PM
Jay wrote:

> I would like your input concerning some code I use to move collections
> to and from streams. Sometimes they work & sometimes they fail. I
> haven't been able to figure out why?

Before Listes() is called, are you seeking the TMemoryStream's Position back 
to 0?  If not, that can cause LoadCollectionFromStream() to fail.

-- 
Remy Lebeau (TeamB)
0
Remy
7/27/2015 4:22:40 PM
I'll try setting the position of the stream back to zero, test it extensively & let you know how it turns out.

As an aside, instead of converting collections & stringlists (I use them also) to streams, can I just use Indy's RawToBytes & BytesToRaw to convert them to/from bytes directly & completely ignore the 
intermediate use of streams?

Jay Dee
0
Jay
7/27/2015 5:09:38 PM
Jay wrote:

> As an aside, instead of converting collections & stringlists (I use
> them also) to streams, can I just use Indy's RawToBytes &
> BytesToRaw to convert them to/from bytes directly & completely
> ignore the intermediate use of streams?

TIdIOHandler has methods for reading/writing TStrings data.  But you must 
serialize a TCollection to/from a flat format (which is what TReader/TWriter 
do for you).  Whether you decide to use a TStream or a byte array or whatever 
to transmit the serialized data, it doesn't really matter.  However, if you 
really want to read/write the serialized data over the socket using a byte 
array instead of a TStream, you could wrap the bytes in a TBytesStream when 
calling TReader/TWriter, at least.

RawToBytes()/BytesToRaw() simply copy bytes as-is between a byte array and 
a memory location.  That is not adequate enough by itself for proper serialization.

-- 
Remy Lebeau (TeamB)
0
Remy
7/27/2015 6:15:40 PM
Thanks for your clarification. I did what you suggested (setting stream position to zero before calling Listes) and I got mixed results. Sometimes it worked 
& sometimes it did not work.

I then undefined the connection thread $DEFINE & voila! I was able to retrieve data from the server over 300 times in succession without fail! I reenabled the 
connection thread & the problem returned. I now realize that the problem is thread deadlock. 

I am now convinced that the the normal threads & the connection thread are at loggerheads. The locks/unlocks in the code are just TCriticalSection.Enter
& TCriticalSection.Leave. The shared resource is an IdTCPClient, that is why I use critical sections to protect it. 
Aside from the Execute() code above, I also use locks when sending data to the server as follows

{code}
    // Convertir le protocol en bytes
    LBuffer := ProtocolToBytes(LProtocol);
    //
    Lock;
    //
    try
      //
      try
        //// clear any residual data that may be in the input buffer
        //if not ThreadTCPClient.IOHandler.InputBufferIsEmpty and
        //  (ThreadTCPClient.IOHandler.InputBuffer.Size >= szProtocol) then
        //  ThreadTCPClient.IOHandler.InputBuffer.Clear;
        // Envoyez la requête en bytes au serveur
        ThreadTCPClient.IOHandler.Write(LBuffer);
      except
        // Exception handling code here !
        On E: EIdSocketError do begin
          raise EAbort.Create(strProblemeDeConnection)
        end;
        On E: EIdConnClosedGracefully do
        begin
          raise EAbort.Create(strProblemeDeConnection)
        end;
        //
        On E: Exception do begin
          raise EAbort.Create(E.Message)
        end;
      end;
    finally
      //
      Unlock;
      //
      ClearBuffer(LBuffer);
    end;  // try....finally
{code}

I could abandon the idea of a HEARTBEAT connection thread but it is very useful & I'd like to keep it. What do you think I should do to avoid thread deadlock?

Thanks,

Jay Dee
0
Jay
7/27/2015 8:26:19 PM
Jay wrote:

> I could abandon the idea of a HEARTBEAT connection thread but it
> is very useful & I'd like to keep it. What do you think I should do to
> avoid thread deadlock?

Why are you not sending heartbeats on all of the connections?  Your protocol 
handlers are already set up to handle them, so why not just enable them always? 
 It doesn't really make sense to have a dedicated connection thread just 
for heartbeats, when the other threads can (and should) handle their own 
local heartbeats.

-- 
Remy Lebeau (TeamB)
0
Remy
7/27/2015 10:07:38 PM
Hi there Remy,

I did not put a heartbeat in all of the threads because the threads terminate on completion (FreeOnTerminate := True) so if a client decides to take a coffee break or is doing something else on the computer, he is still connected but the only thread that is active will be the connection thread (FreeOnTerminate := False). That is why I kept the connection thread. 

I even have a class variable FLastPingTime declared in the thread class thinking that ALL threads could have this heartbeat so technically FLastPingTime could be set to Now everytime a thread sends data to the server. The question is how do I take account of pauses (no data request is being sent/received) that could exceed 2 minutes (e.g coffee breaks etc) while still connected to the server? I'm certain that solving this will solve this problem because like I said in my last reply, once I undefine

{code}
//{$DEFINE CONN_THREAD}
{code}

everything works perfectly.

Thanks,

Jay Dee
0
Jay
7/28/2015 9:19:03 AM
Reply: